1import unittest
2import unittest.mock
3from test.support import (verbose, refcount_test,
4                          cpython_only, requires_subprocess)
5from test.support.import_helper import import_module
6from test.support.os_helper import temp_dir, TESTFN, unlink
7from test.support.script_helper import assert_python_ok, make_script
8from test.support import threading_helper
9
10import gc
11import sys
12import sysconfig
13import textwrap
14import threading
15import time
16import weakref
17
18try:
19    from _testcapi import with_tp_del
20except ImportError:
21    def with_tp_del(cls):
22        class C(object):
23            def __new__(cls, *args, **kwargs):
24                raise TypeError('requires _testcapi.with_tp_del')
25        return C
26
27try:
28    from _testcapi import ContainerNoGC
29except ImportError:
30    ContainerNoGC = None
31
32### Support code
33###############################################################################
34
35# Bug 1055820 has several tests of longstanding bugs involving weakrefs and
36# cyclic gc.
37
38# An instance of C1055820 has a self-loop, so becomes cyclic trash when
39# unreachable.
40class C1055820(object):
41    def __init__(self, i):
42        self.i = i
43        self.loop = self
44
45class GC_Detector(object):
46    # Create an instance I.  Then gc hasn't happened again so long as
47    # I.gc_happened is false.
48
49    def __init__(self):
50        self.gc_happened = False
51
52        def it_happened(ignored):
53            self.gc_happened = True
54
55        # Create a piece of cyclic trash that triggers it_happened when
56        # gc collects it.
57        self.wr = weakref.ref(C1055820(666), it_happened)
58
59@with_tp_del
60class Uncollectable(object):
61    """Create a reference cycle with multiple __del__ methods.
62
63    An object in a reference cycle will never have zero references,
64    and so must be garbage collected.  If one or more objects in the
65    cycle have __del__ methods, the gc refuses to guess an order,
66    and leaves the cycle uncollected."""
67    def __init__(self, partner=None):
68        if partner is None:
69            self.partner = Uncollectable(partner=self)
70        else:
71            self.partner = partner
72    def __tp_del__(self):
73        pass
74
75if sysconfig.get_config_vars().get('PY_CFLAGS', ''):
76    BUILD_WITH_NDEBUG = ('-DNDEBUG' in sysconfig.get_config_vars()['PY_CFLAGS'])
77else:
78    # Usually, sys.gettotalrefcount() is only present if Python has been
79    # compiled in debug mode. If it's missing, expect that Python has
80    # been released in release mode: with NDEBUG defined.
81    BUILD_WITH_NDEBUG = (not hasattr(sys, 'gettotalrefcount'))
82
83### Tests
84###############################################################################
85
86class GCTests(unittest.TestCase):
87    def test_list(self):
88        l = []
89        l.append(l)
90        gc.collect()
91        del l
92        self.assertEqual(gc.collect(), 1)
93
94    def test_dict(self):
95        d = {}
96        d[1] = d
97        gc.collect()
98        del d
99        self.assertEqual(gc.collect(), 1)
100
101    def test_tuple(self):
102        # since tuples are immutable we close the loop with a list
103        l = []
104        t = (l,)
105        l.append(t)
106        gc.collect()
107        del t
108        del l
109        self.assertEqual(gc.collect(), 2)
110
111    def test_class(self):
112        class A:
113            pass
114        A.a = A
115        gc.collect()
116        del A
117        self.assertNotEqual(gc.collect(), 0)
118
119    def test_newstyleclass(self):
120        class A(object):
121            pass
122        gc.collect()
123        del A
124        self.assertNotEqual(gc.collect(), 0)
125
126    def test_instance(self):
127        class A:
128            pass
129        a = A()
130        a.a = a
131        gc.collect()
132        del a
133        self.assertNotEqual(gc.collect(), 0)
134
135    def test_newinstance(self):
136        class A(object):
137            pass
138        a = A()
139        a.a = a
140        gc.collect()
141        del a
142        self.assertNotEqual(gc.collect(), 0)
143        class B(list):
144            pass
145        class C(B, A):
146            pass
147        a = C()
148        a.a = a
149        gc.collect()
150        del a
151        self.assertNotEqual(gc.collect(), 0)
152        del B, C
153        self.assertNotEqual(gc.collect(), 0)
154        A.a = A()
155        del A
156        self.assertNotEqual(gc.collect(), 0)
157        self.assertEqual(gc.collect(), 0)
158
159    def test_method(self):
160        # Tricky: self.__init__ is a bound method, it references the instance.
161        class A:
162            def __init__(self):
163                self.init = self.__init__
164        a = A()
165        gc.collect()
166        del a
167        self.assertNotEqual(gc.collect(), 0)
168
169    @cpython_only
170    def test_legacy_finalizer(self):
171        # A() is uncollectable if it is part of a cycle, make sure it shows up
172        # in gc.garbage.
173        @with_tp_del
174        class A:
175            def __tp_del__(self): pass
176        class B:
177            pass
178        a = A()
179        a.a = a
180        id_a = id(a)
181        b = B()
182        b.b = b
183        gc.collect()
184        del a
185        del b
186        self.assertNotEqual(gc.collect(), 0)
187        for obj in gc.garbage:
188            if id(obj) == id_a:
189                del obj.a
190                break
191        else:
192            self.fail("didn't find obj in garbage (finalizer)")
193        gc.garbage.remove(obj)
194
195    @cpython_only
196    def test_legacy_finalizer_newclass(self):
197        # A() is uncollectable if it is part of a cycle, make sure it shows up
198        # in gc.garbage.
199        @with_tp_del
200        class A(object):
201            def __tp_del__(self): pass
202        class B(object):
203            pass
204        a = A()
205        a.a = a
206        id_a = id(a)
207        b = B()
208        b.b = b
209        gc.collect()
210        del a
211        del b
212        self.assertNotEqual(gc.collect(), 0)
213        for obj in gc.garbage:
214            if id(obj) == id_a:
215                del obj.a
216                break
217        else:
218            self.fail("didn't find obj in garbage (finalizer)")
219        gc.garbage.remove(obj)
220
221    def test_function(self):
222        # Tricky: f -> d -> f, code should call d.clear() after the exec to
223        # break the cycle.
224        d = {}
225        exec("def f(): pass\n", d)
226        gc.collect()
227        del d
228        self.assertEqual(gc.collect(), 2)
229
230    def test_function_tp_clear_leaves_consistent_state(self):
231        # https://github.com/python/cpython/issues/91636
232        code = """if 1:
233
234        import gc
235        import weakref
236
237        class LateFin:
238            __slots__ = ('ref',)
239
240            def __del__(self):
241
242                # 8. Now `latefin`'s finalizer is called. Here we
243                #    obtain a reference to `func`, which is currently
244                #    undergoing `tp_clear`.
245                global func
246                func = self.ref()
247
248        class Cyclic(tuple):
249            __slots__ = ()
250
251            # 4. The finalizers of all garbage objects are called. In
252            #    this case this is only us as `func` doesn't have a
253            #    finalizer.
254            def __del__(self):
255
256                # 5. Create a weakref to `func` now. If we had created
257                #    it earlier, it would have been cleared by the
258                #    garbage collector before calling the finalizers.
259                self[1].ref = weakref.ref(self[0])
260
261                # 6. Drop the global reference to `latefin`. The only
262                #    remaining reference is the one we have.
263                global latefin
264                del latefin
265
266            # 7. Now `func` is `tp_clear`-ed. This drops the last
267            #    reference to `Cyclic`, which gets `tp_dealloc`-ed.
268            #    This drops the last reference to `latefin`.
269
270        latefin = LateFin()
271        def func():
272            pass
273        cyc = tuple.__new__(Cyclic, (func, latefin))
274
275        # 1. Create a reference cycle of `cyc` and `func`.
276        func.__module__ = cyc
277
278        # 2. Make the cycle unreachable, but keep the global reference
279        #    to `latefin` so that it isn't detected as garbage. This
280        #    way its finalizer will not be called immediately.
281        del func, cyc
282
283        # 3. Invoke garbage collection,
284        #    which will find `cyc` and `func` as garbage.
285        gc.collect()
286
287        # 9. Previously, this would crash because `func_qualname`
288        #    had been NULL-ed out by func_clear().
289        print(f"{func=}")
290        """
291        # We're mostly just checking that this doesn't crash.
292        rc, stdout, stderr = assert_python_ok("-c", code)
293        self.assertEqual(rc, 0)
294        self.assertRegex(stdout, rb"""\A\s*func=<function  at \S+>\s*\Z""")
295        self.assertFalse(stderr)
296
297    @refcount_test
298    def test_frame(self):
299        def f():
300            frame = sys._getframe()
301        gc.collect()
302        f()
303        self.assertEqual(gc.collect(), 1)
304
305    def test_saveall(self):
306        # Verify that cyclic garbage like lists show up in gc.garbage if the
307        # SAVEALL option is enabled.
308
309        # First make sure we don't save away other stuff that just happens to
310        # be waiting for collection.
311        gc.collect()
312        # if this fails, someone else created immortal trash
313        self.assertEqual(gc.garbage, [])
314
315        L = []
316        L.append(L)
317        id_L = id(L)
318
319        debug = gc.get_debug()
320        gc.set_debug(debug | gc.DEBUG_SAVEALL)
321        del L
322        gc.collect()
323        gc.set_debug(debug)
324
325        self.assertEqual(len(gc.garbage), 1)
326        obj = gc.garbage.pop()
327        self.assertEqual(id(obj), id_L)
328
329    def test_del(self):
330        # __del__ methods can trigger collection, make this to happen
331        thresholds = gc.get_threshold()
332        gc.enable()
333        gc.set_threshold(1)
334
335        class A:
336            def __del__(self):
337                dir(self)
338        a = A()
339        del a
340
341        gc.disable()
342        gc.set_threshold(*thresholds)
343
344    def test_del_newclass(self):
345        # __del__ methods can trigger collection, make this to happen
346        thresholds = gc.get_threshold()
347        gc.enable()
348        gc.set_threshold(1)
349
350        class A(object):
351            def __del__(self):
352                dir(self)
353        a = A()
354        del a
355
356        gc.disable()
357        gc.set_threshold(*thresholds)
358
359    # The following two tests are fragile:
360    # They precisely count the number of allocations,
361    # which is highly implementation-dependent.
362    # For example, disposed tuples are not freed, but reused.
363    # To minimize variations, though, we first store the get_count() results
364    # and check them at the end.
365    @refcount_test
366    def test_get_count(self):
367        gc.collect()
368        a, b, c = gc.get_count()
369        x = []
370        d, e, f = gc.get_count()
371        self.assertEqual((b, c), (0, 0))
372        self.assertEqual((e, f), (0, 0))
373        # This is less fragile than asserting that a equals 0.
374        self.assertLess(a, 5)
375        # Between the two calls to get_count(), at least one object was
376        # created (the list).
377        self.assertGreater(d, a)
378
379    @refcount_test
380    def test_collect_generations(self):
381        gc.collect()
382        # This object will "trickle" into generation N + 1 after
383        # each call to collect(N)
384        x = []
385        gc.collect(0)
386        # x is now in gen 1
387        a, b, c = gc.get_count()
388        gc.collect(1)
389        # x is now in gen 2
390        d, e, f = gc.get_count()
391        gc.collect(2)
392        # x is now in gen 3
393        g, h, i = gc.get_count()
394        # We don't check a, d, g since their exact values depends on
395        # internal implementation details of the interpreter.
396        self.assertEqual((b, c), (1, 0))
397        self.assertEqual((e, f), (0, 1))
398        self.assertEqual((h, i), (0, 0))
399
400    def test_trashcan(self):
401        class Ouch:
402            n = 0
403            def __del__(self):
404                Ouch.n = Ouch.n + 1
405                if Ouch.n % 17 == 0:
406                    gc.collect()
407
408        # "trashcan" is a hack to prevent stack overflow when deallocating
409        # very deeply nested tuples etc.  It works in part by abusing the
410        # type pointer and refcount fields, and that can yield horrible
411        # problems when gc tries to traverse the structures.
412        # If this test fails (as it does in 2.0, 2.1 and 2.2), it will
413        # most likely die via segfault.
414
415        # Note:  In 2.3 the possibility for compiling without cyclic gc was
416        # removed, and that in turn allows the trashcan mechanism to work
417        # via much simpler means (e.g., it never abuses the type pointer or
418        # refcount fields anymore).  Since it's much less likely to cause a
419        # problem now, the various constants in this expensive (we force a lot
420        # of full collections) test are cut back from the 2.2 version.
421        gc.enable()
422        N = 150
423        for count in range(2):
424            t = []
425            for i in range(N):
426                t = [t, Ouch()]
427            u = []
428            for i in range(N):
429                u = [u, Ouch()]
430            v = {}
431            for i in range(N):
432                v = {1: v, 2: Ouch()}
433        gc.disable()
434
435    @threading_helper.requires_working_threading()
436    def test_trashcan_threads(self):
437        # Issue #13992: trashcan mechanism should be thread-safe
438        NESTING = 60
439        N_THREADS = 2
440
441        def sleeper_gen():
442            """A generator that releases the GIL when closed or dealloc'ed."""
443            try:
444                yield
445            finally:
446                time.sleep(0.000001)
447
448        class C(list):
449            # Appending to a list is atomic, which avoids the use of a lock.
450            inits = []
451            dels = []
452            def __init__(self, alist):
453                self[:] = alist
454                C.inits.append(None)
455            def __del__(self):
456                # This __del__ is called by subtype_dealloc().
457                C.dels.append(None)
458                # `g` will release the GIL when garbage-collected.  This
459                # helps assert subtype_dealloc's behaviour when threads
460                # switch in the middle of it.
461                g = sleeper_gen()
462                next(g)
463                # Now that __del__ is finished, subtype_dealloc will proceed
464                # to call list_dealloc, which also uses the trashcan mechanism.
465
466        def make_nested():
467            """Create a sufficiently nested container object so that the
468            trashcan mechanism is invoked when deallocating it."""
469            x = C([])
470            for i in range(NESTING):
471                x = [C([x])]
472            del x
473
474        def run_thread():
475            """Exercise make_nested() in a loop."""
476            while not exit:
477                make_nested()
478
479        old_switchinterval = sys.getswitchinterval()
480        sys.setswitchinterval(1e-5)
481        try:
482            exit = []
483            threads = []
484            for i in range(N_THREADS):
485                t = threading.Thread(target=run_thread)
486                threads.append(t)
487            with threading_helper.start_threads(threads, lambda: exit.append(1)):
488                time.sleep(1.0)
489        finally:
490            sys.setswitchinterval(old_switchinterval)
491        gc.collect()
492        self.assertEqual(len(C.inits), len(C.dels))
493
494    def test_boom(self):
495        class Boom:
496            def __getattr__(self, someattribute):
497                del self.attr
498                raise AttributeError
499
500        a = Boom()
501        b = Boom()
502        a.attr = b
503        b.attr = a
504
505        gc.collect()
506        garbagelen = len(gc.garbage)
507        del a, b
508        # a<->b are in a trash cycle now.  Collection will invoke
509        # Boom.__getattr__ (to see whether a and b have __del__ methods), and
510        # __getattr__ deletes the internal "attr" attributes as a side effect.
511        # That causes the trash cycle to get reclaimed via refcounts falling to
512        # 0, thus mutating the trash graph as a side effect of merely asking
513        # whether __del__ exists.  This used to (before 2.3b1) crash Python.
514        # Now __getattr__ isn't called.
515        self.assertEqual(gc.collect(), 2)
516        self.assertEqual(len(gc.garbage), garbagelen)
517
518    def test_boom2(self):
519        class Boom2:
520            def __init__(self):
521                self.x = 0
522
523            def __getattr__(self, someattribute):
524                self.x += 1
525                if self.x > 1:
526                    del self.attr
527                raise AttributeError
528
529        a = Boom2()
530        b = Boom2()
531        a.attr = b
532        b.attr = a
533
534        gc.collect()
535        garbagelen = len(gc.garbage)
536        del a, b
537        # Much like test_boom(), except that __getattr__ doesn't break the
538        # cycle until the second time gc checks for __del__.  As of 2.3b1,
539        # there isn't a second time, so this simply cleans up the trash cycle.
540        # We expect a, b, a.__dict__ and b.__dict__ (4 objects) to get
541        # reclaimed this way.
542        self.assertEqual(gc.collect(), 2)
543        self.assertEqual(len(gc.garbage), garbagelen)
544
545    def test_boom_new(self):
546        # boom__new and boom2_new are exactly like boom and boom2, except use
547        # new-style classes.
548
549        class Boom_New(object):
550            def __getattr__(self, someattribute):
551                del self.attr
552                raise AttributeError
553
554        a = Boom_New()
555        b = Boom_New()
556        a.attr = b
557        b.attr = a
558
559        gc.collect()
560        garbagelen = len(gc.garbage)
561        del a, b
562        self.assertEqual(gc.collect(), 2)
563        self.assertEqual(len(gc.garbage), garbagelen)
564
565    def test_boom2_new(self):
566        class Boom2_New(object):
567            def __init__(self):
568                self.x = 0
569
570            def __getattr__(self, someattribute):
571                self.x += 1
572                if self.x > 1:
573                    del self.attr
574                raise AttributeError
575
576        a = Boom2_New()
577        b = Boom2_New()
578        a.attr = b
579        b.attr = a
580
581        gc.collect()
582        garbagelen = len(gc.garbage)
583        del a, b
584        self.assertEqual(gc.collect(), 2)
585        self.assertEqual(len(gc.garbage), garbagelen)
586
587    def test_get_referents(self):
588        alist = [1, 3, 5]
589        got = gc.get_referents(alist)
590        got.sort()
591        self.assertEqual(got, alist)
592
593        atuple = tuple(alist)
594        got = gc.get_referents(atuple)
595        got.sort()
596        self.assertEqual(got, alist)
597
598        adict = {1: 3, 5: 7}
599        expected = [1, 3, 5, 7]
600        got = gc.get_referents(adict)
601        got.sort()
602        self.assertEqual(got, expected)
603
604        got = gc.get_referents([1, 2], {3: 4}, (0, 0, 0))
605        got.sort()
606        self.assertEqual(got, [0, 0] + list(range(5)))
607
608        self.assertEqual(gc.get_referents(1, 'a', 4j), [])
609
610    def test_is_tracked(self):
611        # Atomic built-in types are not tracked, user-defined objects and
612        # mutable containers are.
613        # NOTE: types with special optimizations (e.g. tuple) have tests
614        # in their own test files instead.
615        self.assertFalse(gc.is_tracked(None))
616        self.assertFalse(gc.is_tracked(1))
617        self.assertFalse(gc.is_tracked(1.0))
618        self.assertFalse(gc.is_tracked(1.0 + 5.0j))
619        self.assertFalse(gc.is_tracked(True))
620        self.assertFalse(gc.is_tracked(False))
621        self.assertFalse(gc.is_tracked(b"a"))
622        self.assertFalse(gc.is_tracked("a"))
623        self.assertFalse(gc.is_tracked(bytearray(b"a")))
624        self.assertFalse(gc.is_tracked(type))
625        self.assertFalse(gc.is_tracked(int))
626        self.assertFalse(gc.is_tracked(object))
627        self.assertFalse(gc.is_tracked(object()))
628
629        class UserClass:
630            pass
631
632        class UserInt(int):
633            pass
634
635        # Base class is object; no extra fields.
636        class UserClassSlots:
637            __slots__ = ()
638
639        # Base class is fixed size larger than object; no extra fields.
640        class UserFloatSlots(float):
641            __slots__ = ()
642
643        # Base class is variable size; no extra fields.
644        class UserIntSlots(int):
645            __slots__ = ()
646
647        self.assertTrue(gc.is_tracked(gc))
648        self.assertTrue(gc.is_tracked(UserClass))
649        self.assertTrue(gc.is_tracked(UserClass()))
650        self.assertTrue(gc.is_tracked(UserInt()))
651        self.assertTrue(gc.is_tracked([]))
652        self.assertTrue(gc.is_tracked(set()))
653        self.assertTrue(gc.is_tracked(UserClassSlots()))
654        self.assertTrue(gc.is_tracked(UserFloatSlots()))
655        self.assertTrue(gc.is_tracked(UserIntSlots()))
656
657    def test_is_finalized(self):
658        # Objects not tracked by the always gc return false
659        self.assertFalse(gc.is_finalized(3))
660
661        storage = []
662        class Lazarus:
663            def __del__(self):
664                storage.append(self)
665
666        lazarus = Lazarus()
667        self.assertFalse(gc.is_finalized(lazarus))
668
669        del lazarus
670        gc.collect()
671
672        lazarus = storage.pop()
673        self.assertTrue(gc.is_finalized(lazarus))
674
675    def test_bug1055820b(self):
676        # Corresponds to temp2b.py in the bug report.
677
678        ouch = []
679        def callback(ignored):
680            ouch[:] = [wr() for wr in WRs]
681
682        Cs = [C1055820(i) for i in range(2)]
683        WRs = [weakref.ref(c, callback) for c in Cs]
684        c = None
685
686        gc.collect()
687        self.assertEqual(len(ouch), 0)
688        # Make the two instances trash, and collect again.  The bug was that
689        # the callback materialized a strong reference to an instance, but gc
690        # cleared the instance's dict anyway.
691        Cs = None
692        gc.collect()
693        self.assertEqual(len(ouch), 2)  # else the callbacks didn't run
694        for x in ouch:
695            # If the callback resurrected one of these guys, the instance
696            # would be damaged, with an empty __dict__.
697            self.assertEqual(x, None)
698
699    def test_bug21435(self):
700        # This is a poor test - its only virtue is that it happened to
701        # segfault on Tim's Windows box before the patch for 21435 was
702        # applied.  That's a nasty bug relying on specific pieces of cyclic
703        # trash appearing in exactly the right order in finalize_garbage()'s
704        # input list.
705        # But there's no reliable way to force that order from Python code,
706        # so over time chances are good this test won't really be testing much
707        # of anything anymore.  Still, if it blows up, there's _some_
708        # problem ;-)
709        gc.collect()
710
711        class A:
712            pass
713
714        class B:
715            def __init__(self, x):
716                self.x = x
717
718            def __del__(self):
719                self.attr = None
720
721        def do_work():
722            a = A()
723            b = B(A())
724
725            a.attr = b
726            b.attr = a
727
728        do_work()
729        gc.collect() # this blows up (bad C pointer) when it fails
730
731    @cpython_only
732    @requires_subprocess()
733    def test_garbage_at_shutdown(self):
734        import subprocess
735        code = """if 1:
736            import gc
737            import _testcapi
738            @_testcapi.with_tp_del
739            class X:
740                def __init__(self, name):
741                    self.name = name
742                def __repr__(self):
743                    return "<X %%r>" %% self.name
744                def __tp_del__(self):
745                    pass
746
747            x = X('first')
748            x.x = x
749            x.y = X('second')
750            del x
751            gc.set_debug(%s)
752        """
753        def run_command(code):
754            p = subprocess.Popen([sys.executable, "-Wd", "-c", code],
755                stdout=subprocess.PIPE,
756                stderr=subprocess.PIPE)
757            stdout, stderr = p.communicate()
758            p.stdout.close()
759            p.stderr.close()
760            self.assertEqual(p.returncode, 0)
761            self.assertEqual(stdout, b"")
762            return stderr
763
764        stderr = run_command(code % "0")
765        self.assertIn(b"ResourceWarning: gc: 2 uncollectable objects at "
766                      b"shutdown; use", stderr)
767        self.assertNotIn(b"<X 'first'>", stderr)
768        # With DEBUG_UNCOLLECTABLE, the garbage list gets printed
769        stderr = run_command(code % "gc.DEBUG_UNCOLLECTABLE")
770        self.assertIn(b"ResourceWarning: gc: 2 uncollectable objects at "
771                      b"shutdown", stderr)
772        self.assertTrue(
773            (b"[<X 'first'>, <X 'second'>]" in stderr) or
774            (b"[<X 'second'>, <X 'first'>]" in stderr), stderr)
775        # With DEBUG_SAVEALL, no additional message should get printed
776        # (because gc.garbage also contains normally reclaimable cyclic
777        # references, and its elements get printed at runtime anyway).
778        stderr = run_command(code % "gc.DEBUG_SAVEALL")
779        self.assertNotIn(b"uncollectable objects at shutdown", stderr)
780
781    def test_gc_main_module_at_shutdown(self):
782        # Create a reference cycle through the __main__ module and check
783        # it gets collected at interpreter shutdown.
784        code = """if 1:
785            class C:
786                def __del__(self):
787                    print('__del__ called')
788            l = [C()]
789            l.append(l)
790            """
791        rc, out, err = assert_python_ok('-c', code)
792        self.assertEqual(out.strip(), b'__del__ called')
793
794    def test_gc_ordinary_module_at_shutdown(self):
795        # Same as above, but with a non-__main__ module.
796        with temp_dir() as script_dir:
797            module = """if 1:
798                class C:
799                    def __del__(self):
800                        print('__del__ called')
801                l = [C()]
802                l.append(l)
803                """
804            code = """if 1:
805                import sys
806                sys.path.insert(0, %r)
807                import gctest
808                """ % (script_dir,)
809            make_script(script_dir, 'gctest', module)
810            rc, out, err = assert_python_ok('-c', code)
811            self.assertEqual(out.strip(), b'__del__ called')
812
813    def test_global_del_SystemExit(self):
814        code = """if 1:
815            class ClassWithDel:
816                def __del__(self):
817                    print('__del__ called')
818            a = ClassWithDel()
819            a.link = a
820            raise SystemExit(0)"""
821        self.addCleanup(unlink, TESTFN)
822        with open(TESTFN, 'w', encoding="utf-8") as script:
823            script.write(code)
824        rc, out, err = assert_python_ok(TESTFN)
825        self.assertEqual(out.strip(), b'__del__ called')
826
827    def test_get_stats(self):
828        stats = gc.get_stats()
829        self.assertEqual(len(stats), 3)
830        for st in stats:
831            self.assertIsInstance(st, dict)
832            self.assertEqual(set(st),
833                             {"collected", "collections", "uncollectable"})
834            self.assertGreaterEqual(st["collected"], 0)
835            self.assertGreaterEqual(st["collections"], 0)
836            self.assertGreaterEqual(st["uncollectable"], 0)
837        # Check that collection counts are incremented correctly
838        if gc.isenabled():
839            self.addCleanup(gc.enable)
840            gc.disable()
841        old = gc.get_stats()
842        gc.collect(0)
843        new = gc.get_stats()
844        self.assertEqual(new[0]["collections"], old[0]["collections"] + 1)
845        self.assertEqual(new[1]["collections"], old[1]["collections"])
846        self.assertEqual(new[2]["collections"], old[2]["collections"])
847        gc.collect(2)
848        new = gc.get_stats()
849        self.assertEqual(new[0]["collections"], old[0]["collections"] + 1)
850        self.assertEqual(new[1]["collections"], old[1]["collections"])
851        self.assertEqual(new[2]["collections"], old[2]["collections"] + 1)
852
853    def test_freeze(self):
854        gc.freeze()
855        self.assertGreater(gc.get_freeze_count(), 0)
856        gc.unfreeze()
857        self.assertEqual(gc.get_freeze_count(), 0)
858
859    def test_get_objects(self):
860        gc.collect()
861        l = []
862        l.append(l)
863        self.assertTrue(
864                any(l is element for element in gc.get_objects(generation=0))
865        )
866        self.assertFalse(
867                any(l is element for element in  gc.get_objects(generation=1))
868        )
869        self.assertFalse(
870                any(l is element for element in gc.get_objects(generation=2))
871        )
872        gc.collect(generation=0)
873        self.assertFalse(
874                any(l is element for element in gc.get_objects(generation=0))
875        )
876        self.assertTrue(
877                any(l is element for element in  gc.get_objects(generation=1))
878        )
879        self.assertFalse(
880                any(l is element for element in gc.get_objects(generation=2))
881        )
882        gc.collect(generation=1)
883        self.assertFalse(
884                any(l is element for element in gc.get_objects(generation=0))
885        )
886        self.assertFalse(
887                any(l is element for element in  gc.get_objects(generation=1))
888        )
889        self.assertTrue(
890                any(l is element for element in gc.get_objects(generation=2))
891        )
892        gc.collect(generation=2)
893        self.assertFalse(
894                any(l is element for element in gc.get_objects(generation=0))
895        )
896        self.assertFalse(
897                any(l is element for element in  gc.get_objects(generation=1))
898        )
899        self.assertTrue(
900                any(l is element for element in gc.get_objects(generation=2))
901        )
902        del l
903        gc.collect()
904
905    def test_get_objects_arguments(self):
906        gc.collect()
907        self.assertEqual(len(gc.get_objects()),
908                         len(gc.get_objects(generation=None)))
909
910        self.assertRaises(ValueError, gc.get_objects, 1000)
911        self.assertRaises(ValueError, gc.get_objects, -1000)
912        self.assertRaises(TypeError, gc.get_objects, "1")
913        self.assertRaises(TypeError, gc.get_objects, 1.234)
914
915    def test_resurrection_only_happens_once_per_object(self):
916        class A:  # simple self-loop
917            def __init__(self):
918                self.me = self
919
920        class Lazarus(A):
921            resurrected = 0
922            resurrected_instances = []
923
924            def __del__(self):
925                Lazarus.resurrected += 1
926                Lazarus.resurrected_instances.append(self)
927
928        gc.collect()
929        gc.disable()
930
931        # We start with 0 resurrections
932        laz = Lazarus()
933        self.assertEqual(Lazarus.resurrected, 0)
934
935        # Deleting the instance and triggering a collection
936        # resurrects the object
937        del laz
938        gc.collect()
939        self.assertEqual(Lazarus.resurrected, 1)
940        self.assertEqual(len(Lazarus.resurrected_instances), 1)
941
942        # Clearing the references and forcing a collection
943        # should not resurrect the object again.
944        Lazarus.resurrected_instances.clear()
945        self.assertEqual(Lazarus.resurrected, 1)
946        gc.collect()
947        self.assertEqual(Lazarus.resurrected, 1)
948
949        gc.enable()
950
951    def test_resurrection_is_transitive(self):
952        class Cargo:
953            def __init__(self):
954                self.me = self
955
956        class Lazarus:
957            resurrected_instances = []
958
959            def __del__(self):
960                Lazarus.resurrected_instances.append(self)
961
962        gc.collect()
963        gc.disable()
964
965        laz = Lazarus()
966        cargo = Cargo()
967        cargo_id = id(cargo)
968
969        # Create a cycle between cargo and laz
970        laz.cargo = cargo
971        cargo.laz = laz
972
973        # Drop the references, force a collection and check that
974        # everything was resurrected.
975        del laz, cargo
976        gc.collect()
977        self.assertEqual(len(Lazarus.resurrected_instances), 1)
978        instance = Lazarus.resurrected_instances.pop()
979        self.assertTrue(hasattr(instance, "cargo"))
980        self.assertEqual(id(instance.cargo), cargo_id)
981
982        gc.collect()
983        gc.enable()
984
985    def test_resurrection_does_not_block_cleanup_of_other_objects(self):
986
987        # When a finalizer resurrects objects, stats were reporting them as
988        # having been collected.  This affected both collect()'s return
989        # value and the dicts returned by get_stats().
990        N = 100
991
992        class A:  # simple self-loop
993            def __init__(self):
994                self.me = self
995
996        class Z(A):  # resurrecting __del__
997            def __del__(self):
998                zs.append(self)
999
1000        zs = []
1001
1002        def getstats():
1003            d = gc.get_stats()[-1]
1004            return d['collected'], d['uncollectable']
1005
1006        gc.collect()
1007        gc.disable()
1008
1009        # No problems if just collecting A() instances.
1010        oldc, oldnc = getstats()
1011        for i in range(N):
1012            A()
1013        t = gc.collect()
1014        c, nc = getstats()
1015        self.assertEqual(t, N) # instance objects
1016        self.assertEqual(c - oldc, N)
1017        self.assertEqual(nc - oldnc, 0)
1018
1019        # But Z() is not actually collected.
1020        oldc, oldnc = c, nc
1021        Z()
1022        # Nothing is collected - Z() is merely resurrected.
1023        t = gc.collect()
1024        c, nc = getstats()
1025        self.assertEqual(t, 0)
1026        self.assertEqual(c - oldc, 0)
1027        self.assertEqual(nc - oldnc, 0)
1028
1029        # Z() should not prevent anything else from being collected.
1030        oldc, oldnc = c, nc
1031        for i in range(N):
1032            A()
1033        Z()
1034        t = gc.collect()
1035        c, nc = getstats()
1036        self.assertEqual(t, N)
1037        self.assertEqual(c - oldc, N)
1038        self.assertEqual(nc - oldnc, 0)
1039
1040        # The A() trash should have been reclaimed already but the
1041        # 2 copies of Z are still in zs (and the associated dicts).
1042        oldc, oldnc = c, nc
1043        zs.clear()
1044        t = gc.collect()
1045        c, nc = getstats()
1046        self.assertEqual(t, 2)
1047        self.assertEqual(c - oldc, 2)
1048        self.assertEqual(nc - oldnc, 0)
1049
1050        gc.enable()
1051
1052    @unittest.skipIf(ContainerNoGC is None,
1053                     'requires ContainerNoGC extension type')
1054    def test_trash_weakref_clear(self):
1055        # Test that trash weakrefs are properly cleared (bpo-38006).
1056        #
1057        # Structure we are creating:
1058        #
1059        #   Z <- Y <- A--+--> WZ -> C
1060        #             ^  |
1061        #             +--+
1062        # where:
1063        #   WZ is a weakref to Z with callback C
1064        #   Y doesn't implement tp_traverse
1065        #   A contains a reference to itself, Y and WZ
1066        #
1067        # A, Y, Z, WZ are all trash.  The GC doesn't know that Z is trash
1068        # because Y does not implement tp_traverse.  To show the bug, WZ needs
1069        # to live long enough so that Z is deallocated before it.  Then, if
1070        # gcmodule is buggy, when Z is being deallocated, C will run.
1071        #
1072        # To ensure WZ lives long enough, we put it in a second reference
1073        # cycle.  That trick only works due to the ordering of the GC prev/next
1074        # linked lists.  So, this test is a bit fragile.
1075        #
1076        # The bug reported in bpo-38006 is caused because the GC did not
1077        # clear WZ before starting the process of calling tp_clear on the
1078        # trash.  Normally, handle_weakrefs() would find the weakref via Z and
1079        # clear it.  However, since the GC cannot find Z, WR is not cleared and
1080        # it can execute during delete_garbage().  That can lead to disaster
1081        # since the callback might tinker with objects that have already had
1082        # tp_clear called on them (leaving them in possibly invalid states).
1083
1084        callback = unittest.mock.Mock()
1085
1086        class A:
1087            __slots__ = ['a', 'y', 'wz']
1088
1089        class Z:
1090            pass
1091
1092        # setup required object graph, as described above
1093        a = A()
1094        a.a = a
1095        a.y = ContainerNoGC(Z())
1096        a.wz = weakref.ref(a.y.value, callback)
1097        # create second cycle to keep WZ alive longer
1098        wr_cycle = [a.wz]
1099        wr_cycle.append(wr_cycle)
1100        # ensure trash unrelated to this test is gone
1101        gc.collect()
1102        gc.disable()
1103        # release references and create trash
1104        del a, wr_cycle
1105        gc.collect()
1106        # if called, it means there is a bug in the GC.  The weakref should be
1107        # cleared before Z dies.
1108        callback.assert_not_called()
1109        gc.enable()
1110
1111
1112class GCCallbackTests(unittest.TestCase):
1113    def setUp(self):
1114        # Save gc state and disable it.
1115        self.enabled = gc.isenabled()
1116        gc.disable()
1117        self.debug = gc.get_debug()
1118        gc.set_debug(0)
1119        gc.callbacks.append(self.cb1)
1120        gc.callbacks.append(self.cb2)
1121        self.othergarbage = []
1122
1123    def tearDown(self):
1124        # Restore gc state
1125        del self.visit
1126        gc.callbacks.remove(self.cb1)
1127        gc.callbacks.remove(self.cb2)
1128        gc.set_debug(self.debug)
1129        if self.enabled:
1130            gc.enable()
1131        # destroy any uncollectables
1132        gc.collect()
1133        for obj in gc.garbage:
1134            if isinstance(obj, Uncollectable):
1135                obj.partner = None
1136        del gc.garbage[:]
1137        del self.othergarbage
1138        gc.collect()
1139
1140    def preclean(self):
1141        # Remove all fluff from the system.  Invoke this function
1142        # manually rather than through self.setUp() for maximum
1143        # safety.
1144        self.visit = []
1145        gc.collect()
1146        garbage, gc.garbage[:] = gc.garbage[:], []
1147        self.othergarbage.append(garbage)
1148        self.visit = []
1149
1150    def cb1(self, phase, info):
1151        self.visit.append((1, phase, dict(info)))
1152
1153    def cb2(self, phase, info):
1154        self.visit.append((2, phase, dict(info)))
1155        if phase == "stop" and hasattr(self, "cleanup"):
1156            # Clean Uncollectable from garbage
1157            uc = [e for e in gc.garbage if isinstance(e, Uncollectable)]
1158            gc.garbage[:] = [e for e in gc.garbage
1159                             if not isinstance(e, Uncollectable)]
1160            for e in uc:
1161                e.partner = None
1162
1163    def test_collect(self):
1164        self.preclean()
1165        gc.collect()
1166        # Algorithmically verify the contents of self.visit
1167        # because it is long and tortuous.
1168
1169        # Count the number of visits to each callback
1170        n = [v[0] for v in self.visit]
1171        n1 = [i for i in n if i == 1]
1172        n2 = [i for i in n if i == 2]
1173        self.assertEqual(n1, [1]*2)
1174        self.assertEqual(n2, [2]*2)
1175
1176        # Count that we got the right number of start and stop callbacks.
1177        n = [v[1] for v in self.visit]
1178        n1 = [i for i in n if i == "start"]
1179        n2 = [i for i in n if i == "stop"]
1180        self.assertEqual(n1, ["start"]*2)
1181        self.assertEqual(n2, ["stop"]*2)
1182
1183        # Check that we got the right info dict for all callbacks
1184        for v in self.visit:
1185            info = v[2]
1186            self.assertTrue("generation" in info)
1187            self.assertTrue("collected" in info)
1188            self.assertTrue("uncollectable" in info)
1189
1190    def test_collect_generation(self):
1191        self.preclean()
1192        gc.collect(2)
1193        for v in self.visit:
1194            info = v[2]
1195            self.assertEqual(info["generation"], 2)
1196
1197    @cpython_only
1198    def test_collect_garbage(self):
1199        self.preclean()
1200        # Each of these cause two objects to be garbage:
1201        Uncollectable()
1202        Uncollectable()
1203        C1055820(666)
1204        gc.collect()
1205        for v in self.visit:
1206            if v[1] != "stop":
1207                continue
1208            info = v[2]
1209            self.assertEqual(info["collected"], 1)
1210            self.assertEqual(info["uncollectable"], 4)
1211
1212        # We should now have the Uncollectables in gc.garbage
1213        self.assertEqual(len(gc.garbage), 4)
1214        for e in gc.garbage:
1215            self.assertIsInstance(e, Uncollectable)
1216
1217        # Now, let our callback handle the Uncollectable instances
1218        self.cleanup=True
1219        self.visit = []
1220        gc.garbage[:] = []
1221        gc.collect()
1222        for v in self.visit:
1223            if v[1] != "stop":
1224                continue
1225            info = v[2]
1226            self.assertEqual(info["collected"], 0)
1227            self.assertEqual(info["uncollectable"], 2)
1228
1229        # Uncollectables should be gone
1230        self.assertEqual(len(gc.garbage), 0)
1231
1232
1233    @unittest.skipIf(BUILD_WITH_NDEBUG,
1234                     'built with -NDEBUG')
1235    def test_refcount_errors(self):
1236        self.preclean()
1237        # Verify the "handling" of objects with broken refcounts
1238
1239        # Skip the test if ctypes is not available
1240        import_module("ctypes")
1241
1242        import subprocess
1243        code = textwrap.dedent('''
1244            from test.support import gc_collect, SuppressCrashReport
1245
1246            a = [1, 2, 3]
1247            b = [a]
1248
1249            # Avoid coredump when Py_FatalError() calls abort()
1250            SuppressCrashReport().__enter__()
1251
1252            # Simulate the refcount of "a" being too low (compared to the
1253            # references held on it by live data), but keeping it above zero
1254            # (to avoid deallocating it):
1255            import ctypes
1256            ctypes.pythonapi.Py_DecRef(ctypes.py_object(a))
1257
1258            # The garbage collector should now have a fatal error
1259            # when it reaches the broken object
1260            gc_collect()
1261        ''')
1262        p = subprocess.Popen([sys.executable, "-c", code],
1263                             stdout=subprocess.PIPE,
1264                             stderr=subprocess.PIPE)
1265        stdout, stderr = p.communicate()
1266        p.stdout.close()
1267        p.stderr.close()
1268        # Verify that stderr has a useful error message:
1269        self.assertRegex(stderr,
1270            br'gcmodule\.c:[0-9]+: gc_decref: Assertion "gc_get_refs\(g\) > 0" failed.')
1271        self.assertRegex(stderr,
1272            br'refcount is too small')
1273        # "address : 0x7fb5062efc18"
1274        # "address : 7FB5062EFC18"
1275        address_regex = br'[0-9a-fA-Fx]+'
1276        self.assertRegex(stderr,
1277            br'object address  : ' + address_regex)
1278        self.assertRegex(stderr,
1279            br'object refcount : 1')
1280        self.assertRegex(stderr,
1281            br'object type     : ' + address_regex)
1282        self.assertRegex(stderr,
1283            br'object type name: list')
1284        self.assertRegex(stderr,
1285            br'object repr     : \[1, 2, 3\]')
1286
1287
1288class GCTogglingTests(unittest.TestCase):
1289    def setUp(self):
1290        gc.enable()
1291
1292    def tearDown(self):
1293        gc.disable()
1294
1295    def test_bug1055820c(self):
1296        # Corresponds to temp2c.py in the bug report.  This is pretty
1297        # elaborate.
1298
1299        c0 = C1055820(0)
1300        # Move c0 into generation 2.
1301        gc.collect()
1302
1303        c1 = C1055820(1)
1304        c1.keep_c0_alive = c0
1305        del c0.loop # now only c1 keeps c0 alive
1306
1307        c2 = C1055820(2)
1308        c2wr = weakref.ref(c2) # no callback!
1309
1310        ouch = []
1311        def callback(ignored):
1312            ouch[:] = [c2wr()]
1313
1314        # The callback gets associated with a wr on an object in generation 2.
1315        c0wr = weakref.ref(c0, callback)
1316
1317        c0 = c1 = c2 = None
1318
1319        # What we've set up:  c0, c1, and c2 are all trash now.  c0 is in
1320        # generation 2.  The only thing keeping it alive is that c1 points to
1321        # it. c1 and c2 are in generation 0, and are in self-loops.  There's a
1322        # global weakref to c2 (c2wr), but that weakref has no callback.
1323        # There's also a global weakref to c0 (c0wr), and that does have a
1324        # callback, and that callback references c2 via c2wr().
1325        #
1326        #               c0 has a wr with callback, which references c2wr
1327        #               ^
1328        #               |
1329        #               |     Generation 2 above dots
1330        #. . . . . . . .|. . . . . . . . . . . . . . . . . . . . . . . .
1331        #               |     Generation 0 below dots
1332        #               |
1333        #               |
1334        #            ^->c1   ^->c2 has a wr but no callback
1335        #            |  |    |  |
1336        #            <--v    <--v
1337        #
1338        # So this is the nightmare:  when generation 0 gets collected, we see
1339        # that c2 has a callback-free weakref, and c1 doesn't even have a
1340        # weakref.  Collecting generation 0 doesn't see c0 at all, and c0 is
1341        # the only object that has a weakref with a callback.  gc clears c1
1342        # and c2.  Clearing c1 has the side effect of dropping the refcount on
1343        # c0 to 0, so c0 goes away (despite that it's in an older generation)
1344        # and c0's wr callback triggers.  That in turn materializes a reference
1345        # to c2 via c2wr(), but c2 gets cleared anyway by gc.
1346
1347        # We want to let gc happen "naturally", to preserve the distinction
1348        # between generations.
1349        junk = []
1350        i = 0
1351        detector = GC_Detector()
1352        while not detector.gc_happened:
1353            i += 1
1354            if i > 10000:
1355                self.fail("gc didn't happen after 10000 iterations")
1356            self.assertEqual(len(ouch), 0)
1357            junk.append([])  # this will eventually trigger gc
1358
1359        self.assertEqual(len(ouch), 1)  # else the callback wasn't invoked
1360        for x in ouch:
1361            # If the callback resurrected c2, the instance would be damaged,
1362            # with an empty __dict__.
1363            self.assertEqual(x, None)
1364
1365    def test_bug1055820d(self):
1366        # Corresponds to temp2d.py in the bug report.  This is very much like
1367        # test_bug1055820c, but uses a __del__ method instead of a weakref
1368        # callback to sneak in a resurrection of cyclic trash.
1369
1370        ouch = []
1371        class D(C1055820):
1372            def __del__(self):
1373                ouch[:] = [c2wr()]
1374
1375        d0 = D(0)
1376        # Move all the above into generation 2.
1377        gc.collect()
1378
1379        c1 = C1055820(1)
1380        c1.keep_d0_alive = d0
1381        del d0.loop # now only c1 keeps d0 alive
1382
1383        c2 = C1055820(2)
1384        c2wr = weakref.ref(c2) # no callback!
1385
1386        d0 = c1 = c2 = None
1387
1388        # What we've set up:  d0, c1, and c2 are all trash now.  d0 is in
1389        # generation 2.  The only thing keeping it alive is that c1 points to
1390        # it.  c1 and c2 are in generation 0, and are in self-loops.  There's
1391        # a global weakref to c2 (c2wr), but that weakref has no callback.
1392        # There are no other weakrefs.
1393        #
1394        #               d0 has a __del__ method that references c2wr
1395        #               ^
1396        #               |
1397        #               |     Generation 2 above dots
1398        #. . . . . . . .|. . . . . . . . . . . . . . . . . . . . . . . .
1399        #               |     Generation 0 below dots
1400        #               |
1401        #               |
1402        #            ^->c1   ^->c2 has a wr but no callback
1403        #            |  |    |  |
1404        #            <--v    <--v
1405        #
1406        # So this is the nightmare:  when generation 0 gets collected, we see
1407        # that c2 has a callback-free weakref, and c1 doesn't even have a
1408        # weakref.  Collecting generation 0 doesn't see d0 at all.  gc clears
1409        # c1 and c2.  Clearing c1 has the side effect of dropping the refcount
1410        # on d0 to 0, so d0 goes away (despite that it's in an older
1411        # generation) and d0's __del__ triggers.  That in turn materializes
1412        # a reference to c2 via c2wr(), but c2 gets cleared anyway by gc.
1413
1414        # We want to let gc happen "naturally", to preserve the distinction
1415        # between generations.
1416        detector = GC_Detector()
1417        junk = []
1418        i = 0
1419        while not detector.gc_happened:
1420            i += 1
1421            if i > 10000:
1422                self.fail("gc didn't happen after 10000 iterations")
1423            self.assertEqual(len(ouch), 0)
1424            junk.append([])  # this will eventually trigger gc
1425
1426        self.assertEqual(len(ouch), 1)  # else __del__ wasn't invoked
1427        for x in ouch:
1428            # If __del__ resurrected c2, the instance would be damaged, with an
1429            # empty __dict__.
1430            self.assertEqual(x, None)
1431
1432
1433class PythonFinalizationTests(unittest.TestCase):
1434    def test_ast_fini(self):
1435        # bpo-44184: Regression test for subtype_dealloc() when deallocating
1436        # an AST instance also destroy its AST type: subtype_dealloc() must
1437        # not access the type memory after deallocating the instance, since
1438        # the type memory can be freed as well. The test is also related to
1439        # _PyAST_Fini() which clears references to AST types.
1440        code = textwrap.dedent("""
1441            import ast
1442            import codecs
1443
1444            # Small AST tree to keep their AST types alive
1445            tree = ast.parse("def f(x, y): return 2*x-y")
1446            x = [tree]
1447            x.append(x)
1448
1449            # Put the cycle somewhere to survive until the last GC collection.
1450            # Codec search functions are only cleared at the end of
1451            # interpreter_clear().
1452            def search_func(encoding):
1453                return None
1454            search_func.a = x
1455            codecs.register(search_func)
1456        """)
1457        assert_python_ok("-c", code)
1458
1459
1460def setUpModule():
1461    global enabled, debug
1462    enabled = gc.isenabled()
1463    gc.disable()
1464    assert not gc.isenabled()
1465    debug = gc.get_debug()
1466    gc.set_debug(debug & ~gc.DEBUG_LEAK) # this test is supposed to leak
1467    gc.collect() # Delete 2nd generation garbage
1468
1469
1470def tearDownModule():
1471    gc.set_debug(debug)
1472    # test gc.enable() even if GC is disabled by default
1473    if verbose:
1474        print("restoring automatic collection")
1475    # make sure to always test gc.enable()
1476    gc.enable()
1477    assert gc.isenabled()
1478    if not enabled:
1479        gc.disable()
1480
1481
1482if __name__ == "__main__":
1483    unittest.main()
1484