1"""This test checks for correct fork() behavior.
2"""
3
4import _imp as imp
5import os
6import signal
7import sys
8import threading
9import time
10import unittest
11
12from test.fork_wait import ForkWait
13from test import support
14
15
16# Skip test if fork does not exist.
17if not support.has_fork_support:
18    raise unittest.SkipTest("test module requires working os.fork")
19
20
21class ForkTest(ForkWait):
22    def test_threaded_import_lock_fork(self):
23        """Check fork() in main thread works while a subthread is doing an import"""
24        import_started = threading.Event()
25        fake_module_name = "fake test module"
26        partial_module = "partial"
27        complete_module = "complete"
28        def importer():
29            imp.acquire_lock()
30            sys.modules[fake_module_name] = partial_module
31            import_started.set()
32            time.sleep(0.01) # Give the other thread time to try and acquire.
33            sys.modules[fake_module_name] = complete_module
34            imp.release_lock()
35        t = threading.Thread(target=importer)
36        t.start()
37        import_started.wait()
38        exitcode = 42
39        pid = os.fork()
40        try:
41            # PyOS_BeforeFork should have waited for the import to complete
42            # before forking, so the child can recreate the import lock
43            # correctly, but also won't see a partially initialised module
44            if not pid:
45                m = __import__(fake_module_name)
46                if m == complete_module:
47                    os._exit(exitcode)
48                else:
49                    if support.verbose > 1:
50                        print("Child encountered partial module")
51                    os._exit(1)
52            else:
53                t.join()
54                # Exitcode 1 means the child got a partial module (bad.) No
55                # exitcode (but a hang, which manifests as 'got pid 0')
56                # means the child deadlocked (also bad.)
57                self.wait_impl(pid, exitcode=exitcode)
58        finally:
59            try:
60                os.kill(pid, signal.SIGKILL)
61            except OSError:
62                pass
63
64
65    def test_nested_import_lock_fork(self):
66        """Check fork() in main thread works while the main thread is doing an import"""
67        exitcode = 42
68        # Issue 9573: this used to trigger RuntimeError in the child process
69        def fork_with_import_lock(level):
70            release = 0
71            in_child = False
72            try:
73                try:
74                    for i in range(level):
75                        imp.acquire_lock()
76                        release += 1
77                    pid = os.fork()
78                    in_child = not pid
79                finally:
80                    for i in range(release):
81                        imp.release_lock()
82            except RuntimeError:
83                if in_child:
84                    if support.verbose > 1:
85                        print("RuntimeError in child")
86                    os._exit(1)
87                raise
88            if in_child:
89                os._exit(exitcode)
90            self.wait_impl(pid, exitcode=exitcode)
91
92        # Check this works with various levels of nested
93        # import in the main thread
94        for level in range(5):
95            fork_with_import_lock(level)
96
97
98def tearDownModule():
99    support.reap_children()
100
101if __name__ == "__main__":
102    unittest.main()
103