1import gc
2import sys
3import doctest
4import unittest
5import collections
6import weakref
7import operator
8import contextlib
9import copy
10import threading
11import time
12import random
13
14from test import support
15from test.support import script_helper, ALWAYS_EQ
16from test.support import gc_collect
17from test.support import threading_helper
18
19# Used in ReferencesTestCase.test_ref_created_during_del() .
20ref_from_del = None
21
22# Used by FinalizeTestCase as a global that may be replaced by None
23# when the interpreter shuts down.
24_global_var = 'foobar'
25
26class C:
27    def method(self):
28        pass
29
30
31class Callable:
32    bar = None
33
34    def __call__(self, x):
35        self.bar = x
36
37
38def create_function():
39    def f(): pass
40    return f
41
42def create_bound_method():
43    return C().method
44
45
46class Object:
47    def __init__(self, arg):
48        self.arg = arg
49    def __repr__(self):
50        return "<Object %r>" % self.arg
51    def __eq__(self, other):
52        if isinstance(other, Object):
53            return self.arg == other.arg
54        return NotImplemented
55    def __lt__(self, other):
56        if isinstance(other, Object):
57            return self.arg < other.arg
58        return NotImplemented
59    def __hash__(self):
60        return hash(self.arg)
61    def some_method(self):
62        return 4
63    def other_method(self):
64        return 5
65
66
67class RefCycle:
68    def __init__(self):
69        self.cycle = self
70
71
72class TestBase(unittest.TestCase):
73
74    def setUp(self):
75        self.cbcalled = 0
76
77    def callback(self, ref):
78        self.cbcalled += 1
79
80
81@contextlib.contextmanager
82def collect_in_thread(period=0.0001):
83    """
84    Ensure GC collections happen in a different thread, at a high frequency.
85    """
86    please_stop = False
87
88    def collect():
89        while not please_stop:
90            time.sleep(period)
91            gc.collect()
92
93    with support.disable_gc():
94        t = threading.Thread(target=collect)
95        t.start()
96        try:
97            yield
98        finally:
99            please_stop = True
100            t.join()
101
102
103class ReferencesTestCase(TestBase):
104
105    def test_basic_ref(self):
106        self.check_basic_ref(C)
107        self.check_basic_ref(create_function)
108        self.check_basic_ref(create_bound_method)
109
110        # Just make sure the tp_repr handler doesn't raise an exception.
111        # Live reference:
112        o = C()
113        wr = weakref.ref(o)
114        repr(wr)
115        # Dead reference:
116        del o
117        repr(wr)
118
119    def test_repr_failure_gh99184(self):
120        class MyConfig(dict):
121            def __getattr__(self, x):
122                return self[x]
123
124        obj = MyConfig(offset=5)
125        obj_weakref = weakref.ref(obj)
126
127        self.assertIn('MyConfig', repr(obj_weakref))
128        self.assertIn('MyConfig', str(obj_weakref))
129
130    def test_basic_callback(self):
131        self.check_basic_callback(C)
132        self.check_basic_callback(create_function)
133        self.check_basic_callback(create_bound_method)
134
135    @support.cpython_only
136    def test_cfunction(self):
137        import _testcapi
138        create_cfunction = _testcapi.create_cfunction
139        f = create_cfunction()
140        wr = weakref.ref(f)
141        self.assertIs(wr(), f)
142        del f
143        self.assertIsNone(wr())
144        self.check_basic_ref(create_cfunction)
145        self.check_basic_callback(create_cfunction)
146
147    def test_multiple_callbacks(self):
148        o = C()
149        ref1 = weakref.ref(o, self.callback)
150        ref2 = weakref.ref(o, self.callback)
151        del o
152        gc_collect()  # For PyPy or other GCs.
153        self.assertIsNone(ref1(), "expected reference to be invalidated")
154        self.assertIsNone(ref2(), "expected reference to be invalidated")
155        self.assertEqual(self.cbcalled, 2,
156                     "callback not called the right number of times")
157
158    def test_multiple_selfref_callbacks(self):
159        # Make sure all references are invalidated before callbacks are called
160        #
161        # What's important here is that we're using the first
162        # reference in the callback invoked on the second reference
163        # (the most recently created ref is cleaned up first).  This
164        # tests that all references to the object are invalidated
165        # before any of the callbacks are invoked, so that we only
166        # have one invocation of _weakref.c:cleanup_helper() active
167        # for a particular object at a time.
168        #
169        def callback(object, self=self):
170            self.ref()
171        c = C()
172        self.ref = weakref.ref(c, callback)
173        ref1 = weakref.ref(c, callback)
174        del c
175
176    def test_constructor_kwargs(self):
177        c = C()
178        self.assertRaises(TypeError, weakref.ref, c, callback=None)
179
180    def test_proxy_ref(self):
181        o = C()
182        o.bar = 1
183        ref1 = weakref.proxy(o, self.callback)
184        ref2 = weakref.proxy(o, self.callback)
185        del o
186        gc_collect()  # For PyPy or other GCs.
187
188        def check(proxy):
189            proxy.bar
190
191        self.assertRaises(ReferenceError, check, ref1)
192        self.assertRaises(ReferenceError, check, ref2)
193        ref3 = weakref.proxy(C())
194        gc_collect()  # For PyPy or other GCs.
195        self.assertRaises(ReferenceError, bool, ref3)
196        self.assertEqual(self.cbcalled, 2)
197
198    def check_basic_ref(self, factory):
199        o = factory()
200        ref = weakref.ref(o)
201        self.assertIsNotNone(ref(),
202                     "weak reference to live object should be live")
203        o2 = ref()
204        self.assertIs(o, o2,
205                     "<ref>() should return original object if live")
206
207    def check_basic_callback(self, factory):
208        self.cbcalled = 0
209        o = factory()
210        ref = weakref.ref(o, self.callback)
211        del o
212        gc_collect()  # For PyPy or other GCs.
213        self.assertEqual(self.cbcalled, 1,
214                     "callback did not properly set 'cbcalled'")
215        self.assertIsNone(ref(),
216                     "ref2 should be dead after deleting object reference")
217
218    def test_ref_reuse(self):
219        o = C()
220        ref1 = weakref.ref(o)
221        # create a proxy to make sure that there's an intervening creation
222        # between these two; it should make no difference
223        proxy = weakref.proxy(o)
224        ref2 = weakref.ref(o)
225        self.assertIs(ref1, ref2,
226                     "reference object w/out callback should be re-used")
227
228        o = C()
229        proxy = weakref.proxy(o)
230        ref1 = weakref.ref(o)
231        ref2 = weakref.ref(o)
232        self.assertIs(ref1, ref2,
233                     "reference object w/out callback should be re-used")
234        self.assertEqual(weakref.getweakrefcount(o), 2,
235                     "wrong weak ref count for object")
236        del proxy
237        gc_collect()  # For PyPy or other GCs.
238        self.assertEqual(weakref.getweakrefcount(o), 1,
239                     "wrong weak ref count for object after deleting proxy")
240
241    def test_proxy_reuse(self):
242        o = C()
243        proxy1 = weakref.proxy(o)
244        ref = weakref.ref(o)
245        proxy2 = weakref.proxy(o)
246        self.assertIs(proxy1, proxy2,
247                     "proxy object w/out callback should have been re-used")
248
249    def test_basic_proxy(self):
250        o = C()
251        self.check_proxy(o, weakref.proxy(o))
252
253        L = collections.UserList()
254        p = weakref.proxy(L)
255        self.assertFalse(p, "proxy for empty UserList should be false")
256        p.append(12)
257        self.assertEqual(len(L), 1)
258        self.assertTrue(p, "proxy for non-empty UserList should be true")
259        p[:] = [2, 3]
260        self.assertEqual(len(L), 2)
261        self.assertEqual(len(p), 2)
262        self.assertIn(3, p, "proxy didn't support __contains__() properly")
263        p[1] = 5
264        self.assertEqual(L[1], 5)
265        self.assertEqual(p[1], 5)
266        L2 = collections.UserList(L)
267        p2 = weakref.proxy(L2)
268        self.assertEqual(p, p2)
269        ## self.assertEqual(repr(L2), repr(p2))
270        L3 = collections.UserList(range(10))
271        p3 = weakref.proxy(L3)
272        self.assertEqual(L3[:], p3[:])
273        self.assertEqual(L3[5:], p3[5:])
274        self.assertEqual(L3[:5], p3[:5])
275        self.assertEqual(L3[2:5], p3[2:5])
276
277    def test_proxy_unicode(self):
278        # See bug 5037
279        class C(object):
280            def __str__(self):
281                return "string"
282            def __bytes__(self):
283                return b"bytes"
284        instance = C()
285        self.assertIn("__bytes__", dir(weakref.proxy(instance)))
286        self.assertEqual(bytes(weakref.proxy(instance)), b"bytes")
287
288    def test_proxy_index(self):
289        class C:
290            def __index__(self):
291                return 10
292        o = C()
293        p = weakref.proxy(o)
294        self.assertEqual(operator.index(p), 10)
295
296    def test_proxy_div(self):
297        class C:
298            def __floordiv__(self, other):
299                return 42
300            def __ifloordiv__(self, other):
301                return 21
302        o = C()
303        p = weakref.proxy(o)
304        self.assertEqual(p // 5, 42)
305        p //= 5
306        self.assertEqual(p, 21)
307
308    def test_proxy_matmul(self):
309        class C:
310            def __matmul__(self, other):
311                return 1729
312            def __rmatmul__(self, other):
313                return -163
314            def __imatmul__(self, other):
315                return 561
316        o = C()
317        p = weakref.proxy(o)
318        self.assertEqual(p @ 5, 1729)
319        self.assertEqual(5 @ p, -163)
320        p @= 5
321        self.assertEqual(p, 561)
322
323    # The PyWeakref_* C API is documented as allowing either NULL or
324    # None as the value for the callback, where either means "no
325    # callback".  The "no callback" ref and proxy objects are supposed
326    # to be shared so long as they exist by all callers so long as
327    # they are active.  In Python 2.3.3 and earlier, this guarantee
328    # was not honored, and was broken in different ways for
329    # PyWeakref_NewRef() and PyWeakref_NewProxy().  (Two tests.)
330
331    def test_shared_ref_without_callback(self):
332        self.check_shared_without_callback(weakref.ref)
333
334    def test_shared_proxy_without_callback(self):
335        self.check_shared_without_callback(weakref.proxy)
336
337    def check_shared_without_callback(self, makeref):
338        o = Object(1)
339        p1 = makeref(o, None)
340        p2 = makeref(o, None)
341        self.assertIs(p1, p2, "both callbacks were None in the C API")
342        del p1, p2
343        p1 = makeref(o)
344        p2 = makeref(o, None)
345        self.assertIs(p1, p2, "callbacks were NULL, None in the C API")
346        del p1, p2
347        p1 = makeref(o)
348        p2 = makeref(o)
349        self.assertIs(p1, p2, "both callbacks were NULL in the C API")
350        del p1, p2
351        p1 = makeref(o, None)
352        p2 = makeref(o)
353        self.assertIs(p1, p2, "callbacks were None, NULL in the C API")
354
355    def test_callable_proxy(self):
356        o = Callable()
357        ref1 = weakref.proxy(o)
358
359        self.check_proxy(o, ref1)
360
361        self.assertIs(type(ref1), weakref.CallableProxyType,
362                     "proxy is not of callable type")
363        ref1('twinkies!')
364        self.assertEqual(o.bar, 'twinkies!',
365                     "call through proxy not passed through to original")
366        ref1(x='Splat.')
367        self.assertEqual(o.bar, 'Splat.',
368                     "call through proxy not passed through to original")
369
370        # expect due to too few args
371        self.assertRaises(TypeError, ref1)
372
373        # expect due to too many args
374        self.assertRaises(TypeError, ref1, 1, 2, 3)
375
376    def check_proxy(self, o, proxy):
377        o.foo = 1
378        self.assertEqual(proxy.foo, 1,
379                     "proxy does not reflect attribute addition")
380        o.foo = 2
381        self.assertEqual(proxy.foo, 2,
382                     "proxy does not reflect attribute modification")
383        del o.foo
384        self.assertFalse(hasattr(proxy, 'foo'),
385                     "proxy does not reflect attribute removal")
386
387        proxy.foo = 1
388        self.assertEqual(o.foo, 1,
389                     "object does not reflect attribute addition via proxy")
390        proxy.foo = 2
391        self.assertEqual(o.foo, 2,
392            "object does not reflect attribute modification via proxy")
393        del proxy.foo
394        self.assertFalse(hasattr(o, 'foo'),
395                     "object does not reflect attribute removal via proxy")
396
397    def test_proxy_deletion(self):
398        # Test clearing of SF bug #762891
399        class Foo:
400            result = None
401            def __delitem__(self, accessor):
402                self.result = accessor
403        g = Foo()
404        f = weakref.proxy(g)
405        del f[0]
406        self.assertEqual(f.result, 0)
407
408    def test_proxy_bool(self):
409        # Test clearing of SF bug #1170766
410        class List(list): pass
411        lyst = List()
412        self.assertEqual(bool(weakref.proxy(lyst)), bool(lyst))
413
414    def test_proxy_iter(self):
415        # Test fails with a debug build of the interpreter
416        # (see bpo-38395).
417
418        obj = None
419
420        class MyObj:
421            def __iter__(self):
422                nonlocal obj
423                del obj
424                return NotImplemented
425
426        obj = MyObj()
427        p = weakref.proxy(obj)
428        with self.assertRaises(TypeError):
429            # "blech" in p calls MyObj.__iter__ through the proxy,
430            # without keeping a reference to the real object, so it
431            # can be killed in the middle of the call
432            "blech" in p
433
434    def test_proxy_next(self):
435        arr = [4, 5, 6]
436        def iterator_func():
437            yield from arr
438        it = iterator_func()
439
440        class IteratesWeakly:
441            def __iter__(self):
442                return weakref.proxy(it)
443
444        weak_it = IteratesWeakly()
445
446        # Calls proxy.__next__
447        self.assertEqual(list(weak_it), [4, 5, 6])
448
449    def test_proxy_bad_next(self):
450        # bpo-44720: PyIter_Next() shouldn't be called if the reference
451        # isn't an iterator.
452
453        not_an_iterator = lambda: 0
454
455        class A:
456            def __iter__(self):
457                return weakref.proxy(not_an_iterator)
458        a = A()
459
460        msg = "Weakref proxy referenced a non-iterator"
461        with self.assertRaisesRegex(TypeError, msg):
462            list(a)
463
464    def test_proxy_reversed(self):
465        class MyObj:
466            def __len__(self):
467                return 3
468            def __reversed__(self):
469                return iter('cba')
470
471        obj = MyObj()
472        self.assertEqual("".join(reversed(weakref.proxy(obj))), "cba")
473
474    def test_proxy_hash(self):
475        class MyObj:
476            def __hash__(self):
477                return 42
478
479        obj = MyObj()
480        with self.assertRaises(TypeError):
481            hash(weakref.proxy(obj))
482
483        class MyObj:
484            __hash__ = None
485
486        obj = MyObj()
487        with self.assertRaises(TypeError):
488            hash(weakref.proxy(obj))
489
490    def test_getweakrefcount(self):
491        o = C()
492        ref1 = weakref.ref(o)
493        ref2 = weakref.ref(o, self.callback)
494        self.assertEqual(weakref.getweakrefcount(o), 2,
495                     "got wrong number of weak reference objects")
496
497        proxy1 = weakref.proxy(o)
498        proxy2 = weakref.proxy(o, self.callback)
499        self.assertEqual(weakref.getweakrefcount(o), 4,
500                     "got wrong number of weak reference objects")
501
502        del ref1, ref2, proxy1, proxy2
503        gc_collect()  # For PyPy or other GCs.
504        self.assertEqual(weakref.getweakrefcount(o), 0,
505                     "weak reference objects not unlinked from"
506                     " referent when discarded.")
507
508        # assumes ints do not support weakrefs
509        self.assertEqual(weakref.getweakrefcount(1), 0,
510                     "got wrong number of weak reference objects for int")
511
512    def test_getweakrefs(self):
513        o = C()
514        ref1 = weakref.ref(o, self.callback)
515        ref2 = weakref.ref(o, self.callback)
516        del ref1
517        gc_collect()  # For PyPy or other GCs.
518        self.assertEqual(weakref.getweakrefs(o), [ref2],
519                     "list of refs does not match")
520
521        o = C()
522        ref1 = weakref.ref(o, self.callback)
523        ref2 = weakref.ref(o, self.callback)
524        del ref2
525        gc_collect()  # For PyPy or other GCs.
526        self.assertEqual(weakref.getweakrefs(o), [ref1],
527                     "list of refs does not match")
528
529        del ref1
530        gc_collect()  # For PyPy or other GCs.
531        self.assertEqual(weakref.getweakrefs(o), [],
532                     "list of refs not cleared")
533
534        # assumes ints do not support weakrefs
535        self.assertEqual(weakref.getweakrefs(1), [],
536                     "list of refs does not match for int")
537
538    def test_newstyle_number_ops(self):
539        class F(float):
540            pass
541        f = F(2.0)
542        p = weakref.proxy(f)
543        self.assertEqual(p + 1.0, 3.0)
544        self.assertEqual(1.0 + p, 3.0)  # this used to SEGV
545
546    def test_callbacks_protected(self):
547        # Callbacks protected from already-set exceptions?
548        # Regression test for SF bug #478534.
549        class BogusError(Exception):
550            pass
551        data = {}
552        def remove(k):
553            del data[k]
554        def encapsulate():
555            f = lambda : ()
556            data[weakref.ref(f, remove)] = None
557            raise BogusError
558        try:
559            encapsulate()
560        except BogusError:
561            pass
562        else:
563            self.fail("exception not properly restored")
564        try:
565            encapsulate()
566        except BogusError:
567            pass
568        else:
569            self.fail("exception not properly restored")
570
571    def test_sf_bug_840829(self):
572        # "weakref callbacks and gc corrupt memory"
573        # subtype_dealloc erroneously exposed a new-style instance
574        # already in the process of getting deallocated to gc,
575        # causing double-deallocation if the instance had a weakref
576        # callback that triggered gc.
577        # If the bug exists, there probably won't be an obvious symptom
578        # in a release build.  In a debug build, a segfault will occur
579        # when the second attempt to remove the instance from the "list
580        # of all objects" occurs.
581
582        import gc
583
584        class C(object):
585            pass
586
587        c = C()
588        wr = weakref.ref(c, lambda ignore: gc.collect())
589        del c
590
591        # There endeth the first part.  It gets worse.
592        del wr
593
594        c1 = C()
595        c1.i = C()
596        wr = weakref.ref(c1.i, lambda ignore: gc.collect())
597
598        c2 = C()
599        c2.c1 = c1
600        del c1  # still alive because c2 points to it
601
602        # Now when subtype_dealloc gets called on c2, it's not enough just
603        # that c2 is immune from gc while the weakref callbacks associated
604        # with c2 execute (there are none in this 2nd half of the test, btw).
605        # subtype_dealloc goes on to call the base classes' deallocs too,
606        # so any gc triggered by weakref callbacks associated with anything
607        # torn down by a base class dealloc can also trigger double
608        # deallocation of c2.
609        del c2
610
611    def test_callback_in_cycle(self):
612        import gc
613
614        class J(object):
615            pass
616
617        class II(object):
618            def acallback(self, ignore):
619                self.J
620
621        I = II()
622        I.J = J
623        I.wr = weakref.ref(J, I.acallback)
624
625        # Now J and II are each in a self-cycle (as all new-style class
626        # objects are, since their __mro__ points back to them).  I holds
627        # both a weak reference (I.wr) and a strong reference (I.J) to class
628        # J.  I is also in a cycle (I.wr points to a weakref that references
629        # I.acallback).  When we del these three, they all become trash, but
630        # the cycles prevent any of them from getting cleaned up immediately.
631        # Instead they have to wait for cyclic gc to deduce that they're
632        # trash.
633        #
634        # gc used to call tp_clear on all of them, and the order in which
635        # it does that is pretty accidental.  The exact order in which we
636        # built up these things manages to provoke gc into running tp_clear
637        # in just the right order (I last).  Calling tp_clear on II leaves
638        # behind an insane class object (its __mro__ becomes NULL).  Calling
639        # tp_clear on J breaks its self-cycle, but J doesn't get deleted
640        # just then because of the strong reference from I.J.  Calling
641        # tp_clear on I starts to clear I's __dict__, and just happens to
642        # clear I.J first -- I.wr is still intact.  That removes the last
643        # reference to J, which triggers the weakref callback.  The callback
644        # tries to do "self.J", and instances of new-style classes look up
645        # attributes ("J") in the class dict first.  The class (II) wants to
646        # search II.__mro__, but that's NULL.   The result was a segfault in
647        # a release build, and an assert failure in a debug build.
648        del I, J, II
649        gc.collect()
650
651    def test_callback_reachable_one_way(self):
652        import gc
653
654        # This one broke the first patch that fixed the previous test. In this case,
655        # the objects reachable from the callback aren't also reachable
656        # from the object (c1) *triggering* the callback:  you can get to
657        # c1 from c2, but not vice-versa.  The result was that c2's __dict__
658        # got tp_clear'ed by the time the c2.cb callback got invoked.
659
660        class C:
661            def cb(self, ignore):
662                self.me
663                self.c1
664                self.wr
665
666        c1, c2 = C(), C()
667
668        c2.me = c2
669        c2.c1 = c1
670        c2.wr = weakref.ref(c1, c2.cb)
671
672        del c1, c2
673        gc.collect()
674
675    def test_callback_different_classes(self):
676        import gc
677
678        # Like test_callback_reachable_one_way, except c2 and c1 have different
679        # classes.  c2's class (C) isn't reachable from c1 then, so protecting
680        # objects reachable from the dying object (c1) isn't enough to stop
681        # c2's class (C) from getting tp_clear'ed before c2.cb is invoked.
682        # The result was a segfault (C.__mro__ was NULL when the callback
683        # tried to look up self.me).
684
685        class C(object):
686            def cb(self, ignore):
687                self.me
688                self.c1
689                self.wr
690
691        class D:
692            pass
693
694        c1, c2 = D(), C()
695
696        c2.me = c2
697        c2.c1 = c1
698        c2.wr = weakref.ref(c1, c2.cb)
699
700        del c1, c2, C, D
701        gc.collect()
702
703    def test_callback_in_cycle_resurrection(self):
704        import gc
705
706        # Do something nasty in a weakref callback:  resurrect objects
707        # from dead cycles.  For this to be attempted, the weakref and
708        # its callback must also be part of the cyclic trash (else the
709        # objects reachable via the callback couldn't be in cyclic trash
710        # to begin with -- the callback would act like an external root).
711        # But gc clears trash weakrefs with callbacks early now, which
712        # disables the callbacks, so the callbacks shouldn't get called
713        # at all (and so nothing actually gets resurrected).
714
715        alist = []
716        class C(object):
717            def __init__(self, value):
718                self.attribute = value
719
720            def acallback(self, ignore):
721                alist.append(self.c)
722
723        c1, c2 = C(1), C(2)
724        c1.c = c2
725        c2.c = c1
726        c1.wr = weakref.ref(c2, c1.acallback)
727        c2.wr = weakref.ref(c1, c2.acallback)
728
729        def C_went_away(ignore):
730            alist.append("C went away")
731        wr = weakref.ref(C, C_went_away)
732
733        del c1, c2, C   # make them all trash
734        self.assertEqual(alist, [])  # del isn't enough to reclaim anything
735
736        gc.collect()
737        # c1.wr and c2.wr were part of the cyclic trash, so should have
738        # been cleared without their callbacks executing.  OTOH, the weakref
739        # to C is bound to a function local (wr), and wasn't trash, so that
740        # callback should have been invoked when C went away.
741        self.assertEqual(alist, ["C went away"])
742        # The remaining weakref should be dead now (its callback ran).
743        self.assertEqual(wr(), None)
744
745        del alist[:]
746        gc.collect()
747        self.assertEqual(alist, [])
748
749    def test_callbacks_on_callback(self):
750        import gc
751
752        # Set up weakref callbacks *on* weakref callbacks.
753        alist = []
754        def safe_callback(ignore):
755            alist.append("safe_callback called")
756
757        class C(object):
758            def cb(self, ignore):
759                alist.append("cb called")
760
761        c, d = C(), C()
762        c.other = d
763        d.other = c
764        callback = c.cb
765        c.wr = weakref.ref(d, callback)     # this won't trigger
766        d.wr = weakref.ref(callback, d.cb)  # ditto
767        external_wr = weakref.ref(callback, safe_callback)  # but this will
768        self.assertIs(external_wr(), callback)
769
770        # The weakrefs attached to c and d should get cleared, so that
771        # C.cb is never called.  But external_wr isn't part of the cyclic
772        # trash, and no cyclic trash is reachable from it, so safe_callback
773        # should get invoked when the bound method object callback (c.cb)
774        # -- which is itself a callback, and also part of the cyclic trash --
775        # gets reclaimed at the end of gc.
776
777        del callback, c, d, C
778        self.assertEqual(alist, [])  # del isn't enough to clean up cycles
779        gc.collect()
780        self.assertEqual(alist, ["safe_callback called"])
781        self.assertEqual(external_wr(), None)
782
783        del alist[:]
784        gc.collect()
785        self.assertEqual(alist, [])
786
787    def test_gc_during_ref_creation(self):
788        self.check_gc_during_creation(weakref.ref)
789
790    def test_gc_during_proxy_creation(self):
791        self.check_gc_during_creation(weakref.proxy)
792
793    def check_gc_during_creation(self, makeref):
794        thresholds = gc.get_threshold()
795        gc.set_threshold(1, 1, 1)
796        gc.collect()
797        class A:
798            pass
799
800        def callback(*args):
801            pass
802
803        referenced = A()
804
805        a = A()
806        a.a = a
807        a.wr = makeref(referenced)
808
809        try:
810            # now make sure the object and the ref get labeled as
811            # cyclic trash:
812            a = A()
813            weakref.ref(referenced, callback)
814
815        finally:
816            gc.set_threshold(*thresholds)
817
818    def test_ref_created_during_del(self):
819        # Bug #1377858
820        # A weakref created in an object's __del__() would crash the
821        # interpreter when the weakref was cleaned up since it would refer to
822        # non-existent memory.  This test should not segfault the interpreter.
823        class Target(object):
824            def __del__(self):
825                global ref_from_del
826                ref_from_del = weakref.ref(self)
827
828        w = Target()
829
830    def test_init(self):
831        # Issue 3634
832        # <weakref to class>.__init__() doesn't check errors correctly
833        r = weakref.ref(Exception)
834        self.assertRaises(TypeError, r.__init__, 0, 0, 0, 0, 0)
835        # No exception should be raised here
836        gc.collect()
837
838    def test_classes(self):
839        # Check that classes are weakrefable.
840        class A(object):
841            pass
842        l = []
843        weakref.ref(int)
844        a = weakref.ref(A, l.append)
845        A = None
846        gc.collect()
847        self.assertEqual(a(), None)
848        self.assertEqual(l, [a])
849
850    def test_equality(self):
851        # Alive weakrefs defer equality testing to their underlying object.
852        x = Object(1)
853        y = Object(1)
854        z = Object(2)
855        a = weakref.ref(x)
856        b = weakref.ref(y)
857        c = weakref.ref(z)
858        d = weakref.ref(x)
859        # Note how we directly test the operators here, to stress both
860        # __eq__ and __ne__.
861        self.assertTrue(a == b)
862        self.assertFalse(a != b)
863        self.assertFalse(a == c)
864        self.assertTrue(a != c)
865        self.assertTrue(a == d)
866        self.assertFalse(a != d)
867        self.assertFalse(a == x)
868        self.assertTrue(a != x)
869        self.assertTrue(a == ALWAYS_EQ)
870        self.assertFalse(a != ALWAYS_EQ)
871        del x, y, z
872        gc.collect()
873        for r in a, b, c:
874            # Sanity check
875            self.assertIs(r(), None)
876        # Dead weakrefs compare by identity: whether `a` and `d` are the
877        # same weakref object is an implementation detail, since they pointed
878        # to the same original object and didn't have a callback.
879        # (see issue #16453).
880        self.assertFalse(a == b)
881        self.assertTrue(a != b)
882        self.assertFalse(a == c)
883        self.assertTrue(a != c)
884        self.assertEqual(a == d, a is d)
885        self.assertEqual(a != d, a is not d)
886
887    def test_ordering(self):
888        # weakrefs cannot be ordered, even if the underlying objects can.
889        ops = [operator.lt, operator.gt, operator.le, operator.ge]
890        x = Object(1)
891        y = Object(1)
892        a = weakref.ref(x)
893        b = weakref.ref(y)
894        for op in ops:
895            self.assertRaises(TypeError, op, a, b)
896        # Same when dead.
897        del x, y
898        gc.collect()
899        for op in ops:
900            self.assertRaises(TypeError, op, a, b)
901
902    def test_hashing(self):
903        # Alive weakrefs hash the same as the underlying object
904        x = Object(42)
905        y = Object(42)
906        a = weakref.ref(x)
907        b = weakref.ref(y)
908        self.assertEqual(hash(a), hash(42))
909        del x, y
910        gc.collect()
911        # Dead weakrefs:
912        # - retain their hash is they were hashed when alive;
913        # - otherwise, cannot be hashed.
914        self.assertEqual(hash(a), hash(42))
915        self.assertRaises(TypeError, hash, b)
916
917    def test_trashcan_16602(self):
918        # Issue #16602: when a weakref's target was part of a long
919        # deallocation chain, the trashcan mechanism could delay clearing
920        # of the weakref and make the target object visible from outside
921        # code even though its refcount had dropped to 0.  A crash ensued.
922        class C:
923            def __init__(self, parent):
924                if not parent:
925                    return
926                wself = weakref.ref(self)
927                def cb(wparent):
928                    o = wself()
929                self.wparent = weakref.ref(parent, cb)
930
931        d = weakref.WeakKeyDictionary()
932        root = c = C(None)
933        for n in range(100):
934            d[c] = c = C(c)
935        del root
936        gc.collect()
937
938    def test_callback_attribute(self):
939        x = Object(1)
940        callback = lambda ref: None
941        ref1 = weakref.ref(x, callback)
942        self.assertIs(ref1.__callback__, callback)
943
944        ref2 = weakref.ref(x)
945        self.assertIsNone(ref2.__callback__)
946
947    def test_callback_attribute_after_deletion(self):
948        x = Object(1)
949        ref = weakref.ref(x, self.callback)
950        self.assertIsNotNone(ref.__callback__)
951        del x
952        support.gc_collect()
953        self.assertIsNone(ref.__callback__)
954
955    def test_set_callback_attribute(self):
956        x = Object(1)
957        callback = lambda ref: None
958        ref1 = weakref.ref(x, callback)
959        with self.assertRaises(AttributeError):
960            ref1.__callback__ = lambda ref: None
961
962    def test_callback_gcs(self):
963        class ObjectWithDel(Object):
964            def __del__(self): pass
965        x = ObjectWithDel(1)
966        ref1 = weakref.ref(x, lambda ref: support.gc_collect())
967        del x
968        support.gc_collect()
969
970
971class SubclassableWeakrefTestCase(TestBase):
972
973    def test_subclass_refs(self):
974        class MyRef(weakref.ref):
975            def __init__(self, ob, callback=None, value=42):
976                self.value = value
977                super().__init__(ob, callback)
978            def __call__(self):
979                self.called = True
980                return super().__call__()
981        o = Object("foo")
982        mr = MyRef(o, value=24)
983        self.assertIs(mr(), o)
984        self.assertTrue(mr.called)
985        self.assertEqual(mr.value, 24)
986        del o
987        gc_collect()  # For PyPy or other GCs.
988        self.assertIsNone(mr())
989        self.assertTrue(mr.called)
990
991    def test_subclass_refs_dont_replace_standard_refs(self):
992        class MyRef(weakref.ref):
993            pass
994        o = Object(42)
995        r1 = MyRef(o)
996        r2 = weakref.ref(o)
997        self.assertIsNot(r1, r2)
998        self.assertEqual(weakref.getweakrefs(o), [r2, r1])
999        self.assertEqual(weakref.getweakrefcount(o), 2)
1000        r3 = MyRef(o)
1001        self.assertEqual(weakref.getweakrefcount(o), 3)
1002        refs = weakref.getweakrefs(o)
1003        self.assertEqual(len(refs), 3)
1004        self.assertIs(r2, refs[0])
1005        self.assertIn(r1, refs[1:])
1006        self.assertIn(r3, refs[1:])
1007
1008    def test_subclass_refs_dont_conflate_callbacks(self):
1009        class MyRef(weakref.ref):
1010            pass
1011        o = Object(42)
1012        r1 = MyRef(o, id)
1013        r2 = MyRef(o, str)
1014        self.assertIsNot(r1, r2)
1015        refs = weakref.getweakrefs(o)
1016        self.assertIn(r1, refs)
1017        self.assertIn(r2, refs)
1018
1019    def test_subclass_refs_with_slots(self):
1020        class MyRef(weakref.ref):
1021            __slots__ = "slot1", "slot2"
1022            def __new__(type, ob, callback, slot1, slot2):
1023                return weakref.ref.__new__(type, ob, callback)
1024            def __init__(self, ob, callback, slot1, slot2):
1025                self.slot1 = slot1
1026                self.slot2 = slot2
1027            def meth(self):
1028                return self.slot1 + self.slot2
1029        o = Object(42)
1030        r = MyRef(o, None, "abc", "def")
1031        self.assertEqual(r.slot1, "abc")
1032        self.assertEqual(r.slot2, "def")
1033        self.assertEqual(r.meth(), "abcdef")
1034        self.assertFalse(hasattr(r, "__dict__"))
1035
1036    def test_subclass_refs_with_cycle(self):
1037        """Confirm https://bugs.python.org/issue3100 is fixed."""
1038        # An instance of a weakref subclass can have attributes.
1039        # If such a weakref holds the only strong reference to the object,
1040        # deleting the weakref will delete the object. In this case,
1041        # the callback must not be called, because the ref object is
1042        # being deleted.
1043        class MyRef(weakref.ref):
1044            pass
1045
1046        # Use a local callback, for "regrtest -R::"
1047        # to detect refcounting problems
1048        def callback(w):
1049            self.cbcalled += 1
1050
1051        o = C()
1052        r1 = MyRef(o, callback)
1053        r1.o = o
1054        del o
1055
1056        del r1 # Used to crash here
1057
1058        self.assertEqual(self.cbcalled, 0)
1059
1060        # Same test, with two weakrefs to the same object
1061        # (since code paths are different)
1062        o = C()
1063        r1 = MyRef(o, callback)
1064        r2 = MyRef(o, callback)
1065        r1.r = r2
1066        r2.o = o
1067        del o
1068        del r2
1069
1070        del r1 # Used to crash here
1071
1072        self.assertEqual(self.cbcalled, 0)
1073
1074
1075class WeakMethodTestCase(unittest.TestCase):
1076
1077    def _subclass(self):
1078        """Return an Object subclass overriding `some_method`."""
1079        class C(Object):
1080            def some_method(self):
1081                return 6
1082        return C
1083
1084    def test_alive(self):
1085        o = Object(1)
1086        r = weakref.WeakMethod(o.some_method)
1087        self.assertIsInstance(r, weakref.ReferenceType)
1088        self.assertIsInstance(r(), type(o.some_method))
1089        self.assertIs(r().__self__, o)
1090        self.assertIs(r().__func__, o.some_method.__func__)
1091        self.assertEqual(r()(), 4)
1092
1093    def test_object_dead(self):
1094        o = Object(1)
1095        r = weakref.WeakMethod(o.some_method)
1096        del o
1097        gc.collect()
1098        self.assertIs(r(), None)
1099
1100    def test_method_dead(self):
1101        C = self._subclass()
1102        o = C(1)
1103        r = weakref.WeakMethod(o.some_method)
1104        del C.some_method
1105        gc.collect()
1106        self.assertIs(r(), None)
1107
1108    def test_callback_when_object_dead(self):
1109        # Test callback behaviour when object dies first.
1110        C = self._subclass()
1111        calls = []
1112        def cb(arg):
1113            calls.append(arg)
1114        o = C(1)
1115        r = weakref.WeakMethod(o.some_method, cb)
1116        del o
1117        gc.collect()
1118        self.assertEqual(calls, [r])
1119        # Callback is only called once.
1120        C.some_method = Object.some_method
1121        gc.collect()
1122        self.assertEqual(calls, [r])
1123
1124    def test_callback_when_method_dead(self):
1125        # Test callback behaviour when method dies first.
1126        C = self._subclass()
1127        calls = []
1128        def cb(arg):
1129            calls.append(arg)
1130        o = C(1)
1131        r = weakref.WeakMethod(o.some_method, cb)
1132        del C.some_method
1133        gc.collect()
1134        self.assertEqual(calls, [r])
1135        # Callback is only called once.
1136        del o
1137        gc.collect()
1138        self.assertEqual(calls, [r])
1139
1140    @support.cpython_only
1141    def test_no_cycles(self):
1142        # A WeakMethod doesn't create any reference cycle to itself.
1143        o = Object(1)
1144        def cb(_):
1145            pass
1146        r = weakref.WeakMethod(o.some_method, cb)
1147        wr = weakref.ref(r)
1148        del r
1149        self.assertIs(wr(), None)
1150
1151    def test_equality(self):
1152        def _eq(a, b):
1153            self.assertTrue(a == b)
1154            self.assertFalse(a != b)
1155        def _ne(a, b):
1156            self.assertTrue(a != b)
1157            self.assertFalse(a == b)
1158        x = Object(1)
1159        y = Object(1)
1160        a = weakref.WeakMethod(x.some_method)
1161        b = weakref.WeakMethod(y.some_method)
1162        c = weakref.WeakMethod(x.other_method)
1163        d = weakref.WeakMethod(y.other_method)
1164        # Objects equal, same method
1165        _eq(a, b)
1166        _eq(c, d)
1167        # Objects equal, different method
1168        _ne(a, c)
1169        _ne(a, d)
1170        _ne(b, c)
1171        _ne(b, d)
1172        # Objects unequal, same or different method
1173        z = Object(2)
1174        e = weakref.WeakMethod(z.some_method)
1175        f = weakref.WeakMethod(z.other_method)
1176        _ne(a, e)
1177        _ne(a, f)
1178        _ne(b, e)
1179        _ne(b, f)
1180        # Compare with different types
1181        _ne(a, x.some_method)
1182        _eq(a, ALWAYS_EQ)
1183        del x, y, z
1184        gc.collect()
1185        # Dead WeakMethods compare by identity
1186        refs = a, b, c, d, e, f
1187        for q in refs:
1188            for r in refs:
1189                self.assertEqual(q == r, q is r)
1190                self.assertEqual(q != r, q is not r)
1191
1192    def test_hashing(self):
1193        # Alive WeakMethods are hashable if the underlying object is
1194        # hashable.
1195        x = Object(1)
1196        y = Object(1)
1197        a = weakref.WeakMethod(x.some_method)
1198        b = weakref.WeakMethod(y.some_method)
1199        c = weakref.WeakMethod(y.other_method)
1200        # Since WeakMethod objects are equal, the hashes should be equal.
1201        self.assertEqual(hash(a), hash(b))
1202        ha = hash(a)
1203        # Dead WeakMethods retain their old hash value
1204        del x, y
1205        gc.collect()
1206        self.assertEqual(hash(a), ha)
1207        self.assertEqual(hash(b), ha)
1208        # If it wasn't hashed when alive, a dead WeakMethod cannot be hashed.
1209        self.assertRaises(TypeError, hash, c)
1210
1211
1212class MappingTestCase(TestBase):
1213
1214    COUNT = 10
1215
1216    def check_len_cycles(self, dict_type, cons):
1217        N = 20
1218        items = [RefCycle() for i in range(N)]
1219        dct = dict_type(cons(o) for o in items)
1220        # Keep an iterator alive
1221        it = dct.items()
1222        try:
1223            next(it)
1224        except StopIteration:
1225            pass
1226        del items
1227        gc.collect()
1228        n1 = len(dct)
1229        del it
1230        gc.collect()
1231        n2 = len(dct)
1232        # one item may be kept alive inside the iterator
1233        self.assertIn(n1, (0, 1))
1234        self.assertEqual(n2, 0)
1235
1236    def test_weak_keyed_len_cycles(self):
1237        self.check_len_cycles(weakref.WeakKeyDictionary, lambda k: (k, 1))
1238
1239    def test_weak_valued_len_cycles(self):
1240        self.check_len_cycles(weakref.WeakValueDictionary, lambda k: (1, k))
1241
1242    def check_len_race(self, dict_type, cons):
1243        # Extended sanity checks for len() in the face of cyclic collection
1244        self.addCleanup(gc.set_threshold, *gc.get_threshold())
1245        for th in range(1, 100):
1246            N = 20
1247            gc.collect(0)
1248            gc.set_threshold(th, th, th)
1249            items = [RefCycle() for i in range(N)]
1250            dct = dict_type(cons(o) for o in items)
1251            del items
1252            # All items will be collected at next garbage collection pass
1253            it = dct.items()
1254            try:
1255                next(it)
1256            except StopIteration:
1257                pass
1258            n1 = len(dct)
1259            del it
1260            n2 = len(dct)
1261            self.assertGreaterEqual(n1, 0)
1262            self.assertLessEqual(n1, N)
1263            self.assertGreaterEqual(n2, 0)
1264            self.assertLessEqual(n2, n1)
1265
1266    def test_weak_keyed_len_race(self):
1267        self.check_len_race(weakref.WeakKeyDictionary, lambda k: (k, 1))
1268
1269    def test_weak_valued_len_race(self):
1270        self.check_len_race(weakref.WeakValueDictionary, lambda k: (1, k))
1271
1272    def test_weak_values(self):
1273        #
1274        #  This exercises d.copy(), d.items(), d[], del d[], len(d).
1275        #
1276        dict, objects = self.make_weak_valued_dict()
1277        for o in objects:
1278            self.assertEqual(weakref.getweakrefcount(o), 1)
1279            self.assertIs(o, dict[o.arg],
1280                         "wrong object returned by weak dict!")
1281        items1 = list(dict.items())
1282        items2 = list(dict.copy().items())
1283        items1.sort()
1284        items2.sort()
1285        self.assertEqual(items1, items2,
1286                     "cloning of weak-valued dictionary did not work!")
1287        del items1, items2
1288        self.assertEqual(len(dict), self.COUNT)
1289        del objects[0]
1290        gc_collect()  # For PyPy or other GCs.
1291        self.assertEqual(len(dict), self.COUNT - 1,
1292                     "deleting object did not cause dictionary update")
1293        del objects, o
1294        gc_collect()  # For PyPy or other GCs.
1295        self.assertEqual(len(dict), 0,
1296                     "deleting the values did not clear the dictionary")
1297        # regression on SF bug #447152:
1298        dict = weakref.WeakValueDictionary()
1299        self.assertRaises(KeyError, dict.__getitem__, 1)
1300        dict[2] = C()
1301        gc_collect()  # For PyPy or other GCs.
1302        self.assertRaises(KeyError, dict.__getitem__, 2)
1303
1304    def test_weak_keys(self):
1305        #
1306        #  This exercises d.copy(), d.items(), d[] = v, d[], del d[],
1307        #  len(d), k in d.
1308        #
1309        dict, objects = self.make_weak_keyed_dict()
1310        for o in objects:
1311            self.assertEqual(weakref.getweakrefcount(o), 1,
1312                         "wrong number of weak references to %r!" % o)
1313            self.assertIs(o.arg, dict[o],
1314                         "wrong object returned by weak dict!")
1315        items1 = dict.items()
1316        items2 = dict.copy().items()
1317        self.assertEqual(set(items1), set(items2),
1318                     "cloning of weak-keyed dictionary did not work!")
1319        del items1, items2
1320        self.assertEqual(len(dict), self.COUNT)
1321        del objects[0]
1322        gc_collect()  # For PyPy or other GCs.
1323        self.assertEqual(len(dict), (self.COUNT - 1),
1324                     "deleting object did not cause dictionary update")
1325        del objects, o
1326        gc_collect()  # For PyPy or other GCs.
1327        self.assertEqual(len(dict), 0,
1328                     "deleting the keys did not clear the dictionary")
1329        o = Object(42)
1330        dict[o] = "What is the meaning of the universe?"
1331        self.assertIn(o, dict)
1332        self.assertNotIn(34, dict)
1333
1334    def test_weak_keyed_iters(self):
1335        dict, objects = self.make_weak_keyed_dict()
1336        self.check_iters(dict)
1337
1338        # Test keyrefs()
1339        refs = dict.keyrefs()
1340        self.assertEqual(len(refs), len(objects))
1341        objects2 = list(objects)
1342        for wr in refs:
1343            ob = wr()
1344            self.assertIn(ob, dict)
1345            self.assertIn(ob, dict)
1346            self.assertEqual(ob.arg, dict[ob])
1347            objects2.remove(ob)
1348        self.assertEqual(len(objects2), 0)
1349
1350        # Test iterkeyrefs()
1351        objects2 = list(objects)
1352        self.assertEqual(len(list(dict.keyrefs())), len(objects))
1353        for wr in dict.keyrefs():
1354            ob = wr()
1355            self.assertIn(ob, dict)
1356            self.assertIn(ob, dict)
1357            self.assertEqual(ob.arg, dict[ob])
1358            objects2.remove(ob)
1359        self.assertEqual(len(objects2), 0)
1360
1361    def test_weak_valued_iters(self):
1362        dict, objects = self.make_weak_valued_dict()
1363        self.check_iters(dict)
1364
1365        # Test valuerefs()
1366        refs = dict.valuerefs()
1367        self.assertEqual(len(refs), len(objects))
1368        objects2 = list(objects)
1369        for wr in refs:
1370            ob = wr()
1371            self.assertEqual(ob, dict[ob.arg])
1372            self.assertEqual(ob.arg, dict[ob.arg].arg)
1373            objects2.remove(ob)
1374        self.assertEqual(len(objects2), 0)
1375
1376        # Test itervaluerefs()
1377        objects2 = list(objects)
1378        self.assertEqual(len(list(dict.itervaluerefs())), len(objects))
1379        for wr in dict.itervaluerefs():
1380            ob = wr()
1381            self.assertEqual(ob, dict[ob.arg])
1382            self.assertEqual(ob.arg, dict[ob.arg].arg)
1383            objects2.remove(ob)
1384        self.assertEqual(len(objects2), 0)
1385
1386    def check_iters(self, dict):
1387        # item iterator:
1388        items = list(dict.items())
1389        for item in dict.items():
1390            items.remove(item)
1391        self.assertFalse(items, "items() did not touch all items")
1392
1393        # key iterator, via __iter__():
1394        keys = list(dict.keys())
1395        for k in dict:
1396            keys.remove(k)
1397        self.assertFalse(keys, "__iter__() did not touch all keys")
1398
1399        # key iterator, via iterkeys():
1400        keys = list(dict.keys())
1401        for k in dict.keys():
1402            keys.remove(k)
1403        self.assertFalse(keys, "iterkeys() did not touch all keys")
1404
1405        # value iterator:
1406        values = list(dict.values())
1407        for v in dict.values():
1408            values.remove(v)
1409        self.assertFalse(values,
1410                     "itervalues() did not touch all values")
1411
1412    def check_weak_destroy_while_iterating(self, dict, objects, iter_name):
1413        n = len(dict)
1414        it = iter(getattr(dict, iter_name)())
1415        next(it)             # Trigger internal iteration
1416        # Destroy an object
1417        del objects[-1]
1418        gc.collect()    # just in case
1419        # We have removed either the first consumed object, or another one
1420        self.assertIn(len(list(it)), [len(objects), len(objects) - 1])
1421        del it
1422        # The removal has been committed
1423        self.assertEqual(len(dict), n - 1)
1424
1425    def check_weak_destroy_and_mutate_while_iterating(self, dict, testcontext):
1426        # Check that we can explicitly mutate the weak dict without
1427        # interfering with delayed removal.
1428        # `testcontext` should create an iterator, destroy one of the
1429        # weakref'ed objects and then return a new key/value pair corresponding
1430        # to the destroyed object.
1431        with testcontext() as (k, v):
1432            self.assertNotIn(k, dict)
1433        with testcontext() as (k, v):
1434            self.assertRaises(KeyError, dict.__delitem__, k)
1435        self.assertNotIn(k, dict)
1436        with testcontext() as (k, v):
1437            self.assertRaises(KeyError, dict.pop, k)
1438        self.assertNotIn(k, dict)
1439        with testcontext() as (k, v):
1440            dict[k] = v
1441        self.assertEqual(dict[k], v)
1442        ddict = copy.copy(dict)
1443        with testcontext() as (k, v):
1444            dict.update(ddict)
1445        self.assertEqual(dict, ddict)
1446        with testcontext() as (k, v):
1447            dict.clear()
1448        self.assertEqual(len(dict), 0)
1449
1450    def check_weak_del_and_len_while_iterating(self, dict, testcontext):
1451        # Check that len() works when both iterating and removing keys
1452        # explicitly through various means (.pop(), .clear()...), while
1453        # implicit mutation is deferred because an iterator is alive.
1454        # (each call to testcontext() should schedule one item for removal
1455        #  for this test to work properly)
1456        o = Object(123456)
1457        with testcontext():
1458            n = len(dict)
1459            # Since underlying dict is ordered, first item is popped
1460            dict.pop(next(dict.keys()))
1461            self.assertEqual(len(dict), n - 1)
1462            dict[o] = o
1463            self.assertEqual(len(dict), n)
1464        # last item in objects is removed from dict in context shutdown
1465        with testcontext():
1466            self.assertEqual(len(dict), n - 1)
1467            # Then, (o, o) is popped
1468            dict.popitem()
1469            self.assertEqual(len(dict), n - 2)
1470        with testcontext():
1471            self.assertEqual(len(dict), n - 3)
1472            del dict[next(dict.keys())]
1473            self.assertEqual(len(dict), n - 4)
1474        with testcontext():
1475            self.assertEqual(len(dict), n - 5)
1476            dict.popitem()
1477            self.assertEqual(len(dict), n - 6)
1478        with testcontext():
1479            dict.clear()
1480            self.assertEqual(len(dict), 0)
1481        self.assertEqual(len(dict), 0)
1482
1483    def test_weak_keys_destroy_while_iterating(self):
1484        # Issue #7105: iterators shouldn't crash when a key is implicitly removed
1485        dict, objects = self.make_weak_keyed_dict()
1486        self.check_weak_destroy_while_iterating(dict, objects, 'keys')
1487        self.check_weak_destroy_while_iterating(dict, objects, 'items')
1488        self.check_weak_destroy_while_iterating(dict, objects, 'values')
1489        self.check_weak_destroy_while_iterating(dict, objects, 'keyrefs')
1490        dict, objects = self.make_weak_keyed_dict()
1491        @contextlib.contextmanager
1492        def testcontext():
1493            try:
1494                it = iter(dict.items())
1495                next(it)
1496                # Schedule a key/value for removal and recreate it
1497                v = objects.pop().arg
1498                gc.collect()      # just in case
1499                yield Object(v), v
1500            finally:
1501                it = None           # should commit all removals
1502                gc.collect()
1503        self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
1504        # Issue #21173: len() fragile when keys are both implicitly and
1505        # explicitly removed.
1506        dict, objects = self.make_weak_keyed_dict()
1507        self.check_weak_del_and_len_while_iterating(dict, testcontext)
1508
1509    def test_weak_values_destroy_while_iterating(self):
1510        # Issue #7105: iterators shouldn't crash when a key is implicitly removed
1511        dict, objects = self.make_weak_valued_dict()
1512        self.check_weak_destroy_while_iterating(dict, objects, 'keys')
1513        self.check_weak_destroy_while_iterating(dict, objects, 'items')
1514        self.check_weak_destroy_while_iterating(dict, objects, 'values')
1515        self.check_weak_destroy_while_iterating(dict, objects, 'itervaluerefs')
1516        self.check_weak_destroy_while_iterating(dict, objects, 'valuerefs')
1517        dict, objects = self.make_weak_valued_dict()
1518        @contextlib.contextmanager
1519        def testcontext():
1520            try:
1521                it = iter(dict.items())
1522                next(it)
1523                # Schedule a key/value for removal and recreate it
1524                k = objects.pop().arg
1525                gc.collect()      # just in case
1526                yield k, Object(k)
1527            finally:
1528                it = None           # should commit all removals
1529                gc.collect()
1530        self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
1531        dict, objects = self.make_weak_valued_dict()
1532        self.check_weak_del_and_len_while_iterating(dict, testcontext)
1533
1534    def test_make_weak_keyed_dict_from_dict(self):
1535        o = Object(3)
1536        dict = weakref.WeakKeyDictionary({o:364})
1537        self.assertEqual(dict[o], 364)
1538
1539    def test_make_weak_keyed_dict_from_weak_keyed_dict(self):
1540        o = Object(3)
1541        dict = weakref.WeakKeyDictionary({o:364})
1542        dict2 = weakref.WeakKeyDictionary(dict)
1543        self.assertEqual(dict[o], 364)
1544
1545    def make_weak_keyed_dict(self):
1546        dict = weakref.WeakKeyDictionary()
1547        objects = list(map(Object, range(self.COUNT)))
1548        for o in objects:
1549            dict[o] = o.arg
1550        return dict, objects
1551
1552    def test_make_weak_valued_dict_from_dict(self):
1553        o = Object(3)
1554        dict = weakref.WeakValueDictionary({364:o})
1555        self.assertEqual(dict[364], o)
1556
1557    def test_make_weak_valued_dict_from_weak_valued_dict(self):
1558        o = Object(3)
1559        dict = weakref.WeakValueDictionary({364:o})
1560        dict2 = weakref.WeakValueDictionary(dict)
1561        self.assertEqual(dict[364], o)
1562
1563    def test_make_weak_valued_dict_misc(self):
1564        # errors
1565        self.assertRaises(TypeError, weakref.WeakValueDictionary.__init__)
1566        self.assertRaises(TypeError, weakref.WeakValueDictionary, {}, {})
1567        self.assertRaises(TypeError, weakref.WeakValueDictionary, (), ())
1568        # special keyword arguments
1569        o = Object(3)
1570        for kw in 'self', 'dict', 'other', 'iterable':
1571            d = weakref.WeakValueDictionary(**{kw: o})
1572            self.assertEqual(list(d.keys()), [kw])
1573            self.assertEqual(d[kw], o)
1574
1575    def make_weak_valued_dict(self):
1576        dict = weakref.WeakValueDictionary()
1577        objects = list(map(Object, range(self.COUNT)))
1578        for o in objects:
1579            dict[o.arg] = o
1580        return dict, objects
1581
1582    def check_popitem(self, klass, key1, value1, key2, value2):
1583        weakdict = klass()
1584        weakdict[key1] = value1
1585        weakdict[key2] = value2
1586        self.assertEqual(len(weakdict), 2)
1587        k, v = weakdict.popitem()
1588        self.assertEqual(len(weakdict), 1)
1589        if k is key1:
1590            self.assertIs(v, value1)
1591        else:
1592            self.assertIs(v, value2)
1593        k, v = weakdict.popitem()
1594        self.assertEqual(len(weakdict), 0)
1595        if k is key1:
1596            self.assertIs(v, value1)
1597        else:
1598            self.assertIs(v, value2)
1599
1600    def test_weak_valued_dict_popitem(self):
1601        self.check_popitem(weakref.WeakValueDictionary,
1602                           "key1", C(), "key2", C())
1603
1604    def test_weak_keyed_dict_popitem(self):
1605        self.check_popitem(weakref.WeakKeyDictionary,
1606                           C(), "value 1", C(), "value 2")
1607
1608    def check_setdefault(self, klass, key, value1, value2):
1609        self.assertIsNot(value1, value2,
1610                     "invalid test"
1611                     " -- value parameters must be distinct objects")
1612        weakdict = klass()
1613        o = weakdict.setdefault(key, value1)
1614        self.assertIs(o, value1)
1615        self.assertIn(key, weakdict)
1616        self.assertIs(weakdict.get(key), value1)
1617        self.assertIs(weakdict[key], value1)
1618
1619        o = weakdict.setdefault(key, value2)
1620        self.assertIs(o, value1)
1621        self.assertIn(key, weakdict)
1622        self.assertIs(weakdict.get(key), value1)
1623        self.assertIs(weakdict[key], value1)
1624
1625    def test_weak_valued_dict_setdefault(self):
1626        self.check_setdefault(weakref.WeakValueDictionary,
1627                              "key", C(), C())
1628
1629    def test_weak_keyed_dict_setdefault(self):
1630        self.check_setdefault(weakref.WeakKeyDictionary,
1631                              C(), "value 1", "value 2")
1632
1633    def check_update(self, klass, dict):
1634        #
1635        #  This exercises d.update(), len(d), d.keys(), k in d,
1636        #  d.get(), d[].
1637        #
1638        weakdict = klass()
1639        weakdict.update(dict)
1640        self.assertEqual(len(weakdict), len(dict))
1641        for k in weakdict.keys():
1642            self.assertIn(k, dict, "mysterious new key appeared in weak dict")
1643            v = dict.get(k)
1644            self.assertIs(v, weakdict[k])
1645            self.assertIs(v, weakdict.get(k))
1646        for k in dict.keys():
1647            self.assertIn(k, weakdict, "original key disappeared in weak dict")
1648            v = dict[k]
1649            self.assertIs(v, weakdict[k])
1650            self.assertIs(v, weakdict.get(k))
1651
1652    def test_weak_valued_dict_update(self):
1653        self.check_update(weakref.WeakValueDictionary,
1654                          {1: C(), 'a': C(), C(): C()})
1655        # errors
1656        self.assertRaises(TypeError, weakref.WeakValueDictionary.update)
1657        d = weakref.WeakValueDictionary()
1658        self.assertRaises(TypeError, d.update, {}, {})
1659        self.assertRaises(TypeError, d.update, (), ())
1660        self.assertEqual(list(d.keys()), [])
1661        # special keyword arguments
1662        o = Object(3)
1663        for kw in 'self', 'dict', 'other', 'iterable':
1664            d = weakref.WeakValueDictionary()
1665            d.update(**{kw: o})
1666            self.assertEqual(list(d.keys()), [kw])
1667            self.assertEqual(d[kw], o)
1668
1669    def test_weak_valued_union_operators(self):
1670        a = C()
1671        b = C()
1672        c = C()
1673        wvd1 = weakref.WeakValueDictionary({1: a})
1674        wvd2 = weakref.WeakValueDictionary({1: b, 2: a})
1675        wvd3 = wvd1.copy()
1676        d1 = {1: c, 3: b}
1677        pairs = [(5, c), (6, b)]
1678
1679        tmp1 = wvd1 | wvd2 # Between two WeakValueDictionaries
1680        self.assertEqual(dict(tmp1), dict(wvd1) | dict(wvd2))
1681        self.assertIs(type(tmp1), weakref.WeakValueDictionary)
1682        wvd1 |= wvd2
1683        self.assertEqual(wvd1, tmp1)
1684
1685        tmp2 = wvd2 | d1 # Between WeakValueDictionary and mapping
1686        self.assertEqual(dict(tmp2), dict(wvd2) | d1)
1687        self.assertIs(type(tmp2), weakref.WeakValueDictionary)
1688        wvd2 |= d1
1689        self.assertEqual(wvd2, tmp2)
1690
1691        tmp3 = wvd3.copy() # Between WeakValueDictionary and iterable key, value
1692        tmp3 |= pairs
1693        self.assertEqual(dict(tmp3), dict(wvd3) | dict(pairs))
1694        self.assertIs(type(tmp3), weakref.WeakValueDictionary)
1695
1696        tmp4 = d1 | wvd3 # Testing .__ror__
1697        self.assertEqual(dict(tmp4), d1 | dict(wvd3))
1698        self.assertIs(type(tmp4), weakref.WeakValueDictionary)
1699
1700        del a
1701        self.assertNotIn(2, tmp1)
1702        self.assertNotIn(2, tmp2)
1703        self.assertNotIn(1, tmp3)
1704        self.assertNotIn(1, tmp4)
1705
1706    def test_weak_keyed_dict_update(self):
1707        self.check_update(weakref.WeakKeyDictionary,
1708                          {C(): 1, C(): 2, C(): 3})
1709
1710    def test_weak_keyed_delitem(self):
1711        d = weakref.WeakKeyDictionary()
1712        o1 = Object('1')
1713        o2 = Object('2')
1714        d[o1] = 'something'
1715        d[o2] = 'something'
1716        self.assertEqual(len(d), 2)
1717        del d[o1]
1718        self.assertEqual(len(d), 1)
1719        self.assertEqual(list(d.keys()), [o2])
1720
1721    def test_weak_keyed_union_operators(self):
1722        o1 = C()
1723        o2 = C()
1724        o3 = C()
1725        wkd1 = weakref.WeakKeyDictionary({o1: 1, o2: 2})
1726        wkd2 = weakref.WeakKeyDictionary({o3: 3, o1: 4})
1727        wkd3 = wkd1.copy()
1728        d1 = {o2: '5', o3: '6'}
1729        pairs = [(o2, 7), (o3, 8)]
1730
1731        tmp1 = wkd1 | wkd2 # Between two WeakKeyDictionaries
1732        self.assertEqual(dict(tmp1), dict(wkd1) | dict(wkd2))
1733        self.assertIs(type(tmp1), weakref.WeakKeyDictionary)
1734        wkd1 |= wkd2
1735        self.assertEqual(wkd1, tmp1)
1736
1737        tmp2 = wkd2 | d1 # Between WeakKeyDictionary and mapping
1738        self.assertEqual(dict(tmp2), dict(wkd2) | d1)
1739        self.assertIs(type(tmp2), weakref.WeakKeyDictionary)
1740        wkd2 |= d1
1741        self.assertEqual(wkd2, tmp2)
1742
1743        tmp3 = wkd3.copy() # Between WeakKeyDictionary and iterable key, value
1744        tmp3 |= pairs
1745        self.assertEqual(dict(tmp3), dict(wkd3) | dict(pairs))
1746        self.assertIs(type(tmp3), weakref.WeakKeyDictionary)
1747
1748        tmp4 = d1 | wkd3 # Testing .__ror__
1749        self.assertEqual(dict(tmp4), d1 | dict(wkd3))
1750        self.assertIs(type(tmp4), weakref.WeakKeyDictionary)
1751
1752        del o1
1753        self.assertNotIn(4, tmp1.values())
1754        self.assertNotIn(4, tmp2.values())
1755        self.assertNotIn(1, tmp3.values())
1756        self.assertNotIn(1, tmp4.values())
1757
1758    def test_weak_valued_delitem(self):
1759        d = weakref.WeakValueDictionary()
1760        o1 = Object('1')
1761        o2 = Object('2')
1762        d['something'] = o1
1763        d['something else'] = o2
1764        self.assertEqual(len(d), 2)
1765        del d['something']
1766        self.assertEqual(len(d), 1)
1767        self.assertEqual(list(d.items()), [('something else', o2)])
1768
1769    def test_weak_keyed_bad_delitem(self):
1770        d = weakref.WeakKeyDictionary()
1771        o = Object('1')
1772        # An attempt to delete an object that isn't there should raise
1773        # KeyError.  It didn't before 2.3.
1774        self.assertRaises(KeyError, d.__delitem__, o)
1775        self.assertRaises(KeyError, d.__getitem__, o)
1776
1777        # If a key isn't of a weakly referencable type, __getitem__ and
1778        # __setitem__ raise TypeError.  __delitem__ should too.
1779        self.assertRaises(TypeError, d.__delitem__,  13)
1780        self.assertRaises(TypeError, d.__getitem__,  13)
1781        self.assertRaises(TypeError, d.__setitem__,  13, 13)
1782
1783    def test_weak_keyed_cascading_deletes(self):
1784        # SF bug 742860.  For some reason, before 2.3 __delitem__ iterated
1785        # over the keys via self.data.iterkeys().  If things vanished from
1786        # the dict during this (or got added), that caused a RuntimeError.
1787
1788        d = weakref.WeakKeyDictionary()
1789        mutate = False
1790
1791        class C(object):
1792            def __init__(self, i):
1793                self.value = i
1794            def __hash__(self):
1795                return hash(self.value)
1796            def __eq__(self, other):
1797                if mutate:
1798                    # Side effect that mutates the dict, by removing the
1799                    # last strong reference to a key.
1800                    del objs[-1]
1801                return self.value == other.value
1802
1803        objs = [C(i) for i in range(4)]
1804        for o in objs:
1805            d[o] = o.value
1806        del o   # now the only strong references to keys are in objs
1807        # Find the order in which iterkeys sees the keys.
1808        objs = list(d.keys())
1809        # Reverse it, so that the iteration implementation of __delitem__
1810        # has to keep looping to find the first object we delete.
1811        objs.reverse()
1812
1813        # Turn on mutation in C.__eq__.  The first time through the loop,
1814        # under the iterkeys() business the first comparison will delete
1815        # the last item iterkeys() would see, and that causes a
1816        #     RuntimeError: dictionary changed size during iteration
1817        # when the iterkeys() loop goes around to try comparing the next
1818        # key.  After this was fixed, it just deletes the last object *our*
1819        # "for o in obj" loop would have gotten to.
1820        mutate = True
1821        count = 0
1822        for o in objs:
1823            count += 1
1824            del d[o]
1825        gc_collect()  # For PyPy or other GCs.
1826        self.assertEqual(len(d), 0)
1827        self.assertEqual(count, 2)
1828
1829    def test_make_weak_valued_dict_repr(self):
1830        dict = weakref.WeakValueDictionary()
1831        self.assertRegex(repr(dict), '<WeakValueDictionary at 0x.*>')
1832
1833    def test_make_weak_keyed_dict_repr(self):
1834        dict = weakref.WeakKeyDictionary()
1835        self.assertRegex(repr(dict), '<WeakKeyDictionary at 0x.*>')
1836
1837    @threading_helper.requires_working_threading()
1838    def test_threaded_weak_valued_setdefault(self):
1839        d = weakref.WeakValueDictionary()
1840        with collect_in_thread():
1841            for i in range(100000):
1842                x = d.setdefault(10, RefCycle())
1843                self.assertIsNot(x, None)  # we never put None in there!
1844                del x
1845
1846    @threading_helper.requires_working_threading()
1847    def test_threaded_weak_valued_pop(self):
1848        d = weakref.WeakValueDictionary()
1849        with collect_in_thread():
1850            for i in range(100000):
1851                d[10] = RefCycle()
1852                x = d.pop(10, 10)
1853                self.assertIsNot(x, None)  # we never put None in there!
1854
1855    @threading_helper.requires_working_threading()
1856    def test_threaded_weak_valued_consistency(self):
1857        # Issue #28427: old keys should not remove new values from
1858        # WeakValueDictionary when collecting from another thread.
1859        d = weakref.WeakValueDictionary()
1860        with collect_in_thread():
1861            for i in range(200000):
1862                o = RefCycle()
1863                d[10] = o
1864                # o is still alive, so the dict can't be empty
1865                self.assertEqual(len(d), 1)
1866                o = None  # lose ref
1867
1868    def check_threaded_weak_dict_copy(self, type_, deepcopy):
1869        # `type_` should be either WeakKeyDictionary or WeakValueDictionary.
1870        # `deepcopy` should be either True or False.
1871        exc = []
1872
1873        class DummyKey:
1874            def __init__(self, ctr):
1875                self.ctr = ctr
1876
1877        class DummyValue:
1878            def __init__(self, ctr):
1879                self.ctr = ctr
1880
1881        def dict_copy(d, exc):
1882            try:
1883                if deepcopy is True:
1884                    _ = copy.deepcopy(d)
1885                else:
1886                    _ = d.copy()
1887            except Exception as ex:
1888                exc.append(ex)
1889
1890        def pop_and_collect(lst):
1891            gc_ctr = 0
1892            while lst:
1893                i = random.randint(0, len(lst) - 1)
1894                gc_ctr += 1
1895                lst.pop(i)
1896                if gc_ctr % 10000 == 0:
1897                    gc.collect()  # just in case
1898
1899        self.assertIn(type_, (weakref.WeakKeyDictionary, weakref.WeakValueDictionary))
1900
1901        d = type_()
1902        keys = []
1903        values = []
1904        # Initialize d with many entries
1905        for i in range(70000):
1906            k, v = DummyKey(i), DummyValue(i)
1907            keys.append(k)
1908            values.append(v)
1909            d[k] = v
1910            del k
1911            del v
1912
1913        t_copy = threading.Thread(target=dict_copy, args=(d, exc,))
1914        if type_ is weakref.WeakKeyDictionary:
1915            t_collect = threading.Thread(target=pop_and_collect, args=(keys,))
1916        else:  # weakref.WeakValueDictionary
1917            t_collect = threading.Thread(target=pop_and_collect, args=(values,))
1918
1919        t_copy.start()
1920        t_collect.start()
1921
1922        t_copy.join()
1923        t_collect.join()
1924
1925        # Test exceptions
1926        if exc:
1927            raise exc[0]
1928
1929    @threading_helper.requires_working_threading()
1930    def test_threaded_weak_key_dict_copy(self):
1931        # Issue #35615: Weakref keys or values getting GC'ed during dict
1932        # copying should not result in a crash.
1933        self.check_threaded_weak_dict_copy(weakref.WeakKeyDictionary, False)
1934
1935    @threading_helper.requires_working_threading()
1936    def test_threaded_weak_key_dict_deepcopy(self):
1937        # Issue #35615: Weakref keys or values getting GC'ed during dict
1938        # copying should not result in a crash.
1939        self.check_threaded_weak_dict_copy(weakref.WeakKeyDictionary, True)
1940
1941    @threading_helper.requires_working_threading()
1942    def test_threaded_weak_value_dict_copy(self):
1943        # Issue #35615: Weakref keys or values getting GC'ed during dict
1944        # copying should not result in a crash.
1945        self.check_threaded_weak_dict_copy(weakref.WeakValueDictionary, False)
1946
1947    @threading_helper.requires_working_threading()
1948    def test_threaded_weak_value_dict_deepcopy(self):
1949        # Issue #35615: Weakref keys or values getting GC'ed during dict
1950        # copying should not result in a crash.
1951        self.check_threaded_weak_dict_copy(weakref.WeakValueDictionary, True)
1952
1953    @support.cpython_only
1954    def test_remove_closure(self):
1955        d = weakref.WeakValueDictionary()
1956        self.assertIsNone(d._remove.__closure__)
1957
1958
1959from test import mapping_tests
1960
1961class WeakValueDictionaryTestCase(mapping_tests.BasicTestMappingProtocol):
1962    """Check that WeakValueDictionary conforms to the mapping protocol"""
1963    __ref = {"key1":Object(1), "key2":Object(2), "key3":Object(3)}
1964    type2test = weakref.WeakValueDictionary
1965    def _reference(self):
1966        return self.__ref.copy()
1967
1968class WeakKeyDictionaryTestCase(mapping_tests.BasicTestMappingProtocol):
1969    """Check that WeakKeyDictionary conforms to the mapping protocol"""
1970    __ref = {Object("key1"):1, Object("key2"):2, Object("key3"):3}
1971    type2test = weakref.WeakKeyDictionary
1972    def _reference(self):
1973        return self.__ref.copy()
1974
1975
1976class FinalizeTestCase(unittest.TestCase):
1977
1978    class A:
1979        pass
1980
1981    def _collect_if_necessary(self):
1982        # we create no ref-cycles so in CPython no gc should be needed
1983        if sys.implementation.name != 'cpython':
1984            support.gc_collect()
1985
1986    def test_finalize(self):
1987        def add(x,y,z):
1988            res.append(x + y + z)
1989            return x + y + z
1990
1991        a = self.A()
1992
1993        res = []
1994        f = weakref.finalize(a, add, 67, 43, z=89)
1995        self.assertEqual(f.alive, True)
1996        self.assertEqual(f.peek(), (a, add, (67,43), {'z':89}))
1997        self.assertEqual(f(), 199)
1998        self.assertEqual(f(), None)
1999        self.assertEqual(f(), None)
2000        self.assertEqual(f.peek(), None)
2001        self.assertEqual(f.detach(), None)
2002        self.assertEqual(f.alive, False)
2003        self.assertEqual(res, [199])
2004
2005        res = []
2006        f = weakref.finalize(a, add, 67, 43, 89)
2007        self.assertEqual(f.peek(), (a, add, (67,43,89), {}))
2008        self.assertEqual(f.detach(), (a, add, (67,43,89), {}))
2009        self.assertEqual(f(), None)
2010        self.assertEqual(f(), None)
2011        self.assertEqual(f.peek(), None)
2012        self.assertEqual(f.detach(), None)
2013        self.assertEqual(f.alive, False)
2014        self.assertEqual(res, [])
2015
2016        res = []
2017        f = weakref.finalize(a, add, x=67, y=43, z=89)
2018        del a
2019        self._collect_if_necessary()
2020        self.assertEqual(f(), None)
2021        self.assertEqual(f(), None)
2022        self.assertEqual(f.peek(), None)
2023        self.assertEqual(f.detach(), None)
2024        self.assertEqual(f.alive, False)
2025        self.assertEqual(res, [199])
2026
2027    def test_arg_errors(self):
2028        def fin(*args, **kwargs):
2029            res.append((args, kwargs))
2030
2031        a = self.A()
2032
2033        res = []
2034        f = weakref.finalize(a, fin, 1, 2, func=3, obj=4)
2035        self.assertEqual(f.peek(), (a, fin, (1, 2), {'func': 3, 'obj': 4}))
2036        f()
2037        self.assertEqual(res, [((1, 2), {'func': 3, 'obj': 4})])
2038
2039        with self.assertRaises(TypeError):
2040            weakref.finalize(a, func=fin, arg=1)
2041        with self.assertRaises(TypeError):
2042            weakref.finalize(obj=a, func=fin, arg=1)
2043        self.assertRaises(TypeError, weakref.finalize, a)
2044        self.assertRaises(TypeError, weakref.finalize)
2045
2046    def test_order(self):
2047        a = self.A()
2048        res = []
2049
2050        f1 = weakref.finalize(a, res.append, 'f1')
2051        f2 = weakref.finalize(a, res.append, 'f2')
2052        f3 = weakref.finalize(a, res.append, 'f3')
2053        f4 = weakref.finalize(a, res.append, 'f4')
2054        f5 = weakref.finalize(a, res.append, 'f5')
2055
2056        # make sure finalizers can keep themselves alive
2057        del f1, f4
2058
2059        self.assertTrue(f2.alive)
2060        self.assertTrue(f3.alive)
2061        self.assertTrue(f5.alive)
2062
2063        self.assertTrue(f5.detach())
2064        self.assertFalse(f5.alive)
2065
2066        f5()                       # nothing because previously unregistered
2067        res.append('A')
2068        f3()                       # => res.append('f3')
2069        self.assertFalse(f3.alive)
2070        res.append('B')
2071        f3()                       # nothing because previously called
2072        res.append('C')
2073        del a
2074        self._collect_if_necessary()
2075                                   # => res.append('f4')
2076                                   # => res.append('f2')
2077                                   # => res.append('f1')
2078        self.assertFalse(f2.alive)
2079        res.append('D')
2080        f2()                       # nothing because previously called by gc
2081
2082        expected = ['A', 'f3', 'B', 'C', 'f4', 'f2', 'f1', 'D']
2083        self.assertEqual(res, expected)
2084
2085    def test_all_freed(self):
2086        # we want a weakrefable subclass of weakref.finalize
2087        class MyFinalizer(weakref.finalize):
2088            pass
2089
2090        a = self.A()
2091        res = []
2092        def callback():
2093            res.append(123)
2094        f = MyFinalizer(a, callback)
2095
2096        wr_callback = weakref.ref(callback)
2097        wr_f = weakref.ref(f)
2098        del callback, f
2099
2100        self.assertIsNotNone(wr_callback())
2101        self.assertIsNotNone(wr_f())
2102
2103        del a
2104        self._collect_if_necessary()
2105
2106        self.assertIsNone(wr_callback())
2107        self.assertIsNone(wr_f())
2108        self.assertEqual(res, [123])
2109
2110    @classmethod
2111    def run_in_child(cls):
2112        def error():
2113            # Create an atexit finalizer from inside a finalizer called
2114            # at exit.  This should be the next to be run.
2115            g1 = weakref.finalize(cls, print, 'g1')
2116            print('f3 error')
2117            1/0
2118
2119        # cls should stay alive till atexit callbacks run
2120        f1 = weakref.finalize(cls, print, 'f1', _global_var)
2121        f2 = weakref.finalize(cls, print, 'f2', _global_var)
2122        f3 = weakref.finalize(cls, error)
2123        f4 = weakref.finalize(cls, print, 'f4', _global_var)
2124
2125        assert f1.atexit == True
2126        f2.atexit = False
2127        assert f3.atexit == True
2128        assert f4.atexit == True
2129
2130    def test_atexit(self):
2131        prog = ('from test.test_weakref import FinalizeTestCase;'+
2132                'FinalizeTestCase.run_in_child()')
2133        rc, out, err = script_helper.assert_python_ok('-c', prog)
2134        out = out.decode('ascii').splitlines()
2135        self.assertEqual(out, ['f4 foobar', 'f3 error', 'g1', 'f1 foobar'])
2136        self.assertTrue(b'ZeroDivisionError' in err)
2137
2138
2139class ModuleTestCase(unittest.TestCase):
2140    def test_names(self):
2141        for name in ('ReferenceType', 'ProxyType', 'CallableProxyType',
2142                     'WeakMethod', 'WeakSet', 'WeakKeyDictionary', 'WeakValueDictionary'):
2143            obj = getattr(weakref, name)
2144            if name != 'WeakSet':
2145                self.assertEqual(obj.__module__, 'weakref')
2146            self.assertEqual(obj.__name__, name)
2147            self.assertEqual(obj.__qualname__, name)
2148
2149
2150libreftest = """ Doctest for examples in the library reference: weakref.rst
2151
2152>>> from test.support import gc_collect
2153>>> import weakref
2154>>> class Dict(dict):
2155...     pass
2156...
2157>>> obj = Dict(red=1, green=2, blue=3)   # this object is weak referencable
2158>>> r = weakref.ref(obj)
2159>>> print(r() is obj)
2160True
2161
2162>>> import weakref
2163>>> class Object:
2164...     pass
2165...
2166>>> o = Object()
2167>>> r = weakref.ref(o)
2168>>> o2 = r()
2169>>> o is o2
2170True
2171>>> del o, o2
2172>>> gc_collect()  # For PyPy or other GCs.
2173>>> print(r())
2174None
2175
2176>>> import weakref
2177>>> class ExtendedRef(weakref.ref):
2178...     def __init__(self, ob, callback=None, **annotations):
2179...         super().__init__(ob, callback)
2180...         self.__counter = 0
2181...         for k, v in annotations.items():
2182...             setattr(self, k, v)
2183...     def __call__(self):
2184...         '''Return a pair containing the referent and the number of
2185...         times the reference has been called.
2186...         '''
2187...         ob = super().__call__()
2188...         if ob is not None:
2189...             self.__counter += 1
2190...             ob = (ob, self.__counter)
2191...         return ob
2192...
2193>>> class A:   # not in docs from here, just testing the ExtendedRef
2194...     pass
2195...
2196>>> a = A()
2197>>> r = ExtendedRef(a, foo=1, bar="baz")
2198>>> r.foo
21991
2200>>> r.bar
2201'baz'
2202>>> r()[1]
22031
2204>>> r()[1]
22052
2206>>> r()[0] is a
2207True
2208
2209
2210>>> import weakref
2211>>> _id2obj_dict = weakref.WeakValueDictionary()
2212>>> def remember(obj):
2213...     oid = id(obj)
2214...     _id2obj_dict[oid] = obj
2215...     return oid
2216...
2217>>> def id2obj(oid):
2218...     return _id2obj_dict[oid]
2219...
2220>>> a = A()             # from here, just testing
2221>>> a_id = remember(a)
2222>>> id2obj(a_id) is a
2223True
2224>>> del a
2225>>> gc_collect()  # For PyPy or other GCs.
2226>>> try:
2227...     id2obj(a_id)
2228... except KeyError:
2229...     print('OK')
2230... else:
2231...     print('WeakValueDictionary error')
2232OK
2233
2234"""
2235
2236__test__ = {'libreftest' : libreftest}
2237
2238def load_tests(loader, tests, pattern):
2239    tests.addTest(doctest.DocTestSuite())
2240    return tests
2241
2242
2243if __name__ == "__main__":
2244    unittest.main()
2245