1"""Tests for sys.audit and sys.addaudithook
2"""
3
4import subprocess
5import sys
6import unittest
7from test import support
8from test.support import import_helper
9from test.support import os_helper
10
11
12if not hasattr(sys, "addaudithook") or not hasattr(sys, "audit"):
13    raise unittest.SkipTest("test only relevant when sys.audit is available")
14
15AUDIT_TESTS_PY = support.findfile("audit-tests.py")
16
17
18class AuditTest(unittest.TestCase):
19    maxDiff = None
20
21    @support.requires_subprocess()
22    def do_test(self, *args):
23        with subprocess.Popen(
24            [sys.executable, "-X utf8", AUDIT_TESTS_PY, *args],
25            encoding="utf-8",
26            stdout=subprocess.PIPE,
27            stderr=subprocess.PIPE,
28        ) as p:
29            p.wait()
30            sys.stdout.writelines(p.stdout)
31            sys.stderr.writelines(p.stderr)
32            if p.returncode:
33                self.fail("".join(p.stderr))
34
35    @support.requires_subprocess()
36    def run_python(self, *args):
37        events = []
38        with subprocess.Popen(
39            [sys.executable, "-X utf8", AUDIT_TESTS_PY, *args],
40            encoding="utf-8",
41            stdout=subprocess.PIPE,
42            stderr=subprocess.PIPE,
43        ) as p:
44            p.wait()
45            sys.stderr.writelines(p.stderr)
46            return (
47                p.returncode,
48                [line.strip().partition(" ") for line in p.stdout],
49                "".join(p.stderr),
50            )
51
52    def test_basic(self):
53        self.do_test("test_basic")
54
55    def test_block_add_hook(self):
56        self.do_test("test_block_add_hook")
57
58    def test_block_add_hook_baseexception(self):
59        self.do_test("test_block_add_hook_baseexception")
60
61    def test_marshal(self):
62        import_helper.import_module("marshal")
63
64        self.do_test("test_marshal")
65
66    def test_pickle(self):
67        import_helper.import_module("pickle")
68
69        self.do_test("test_pickle")
70
71    def test_monkeypatch(self):
72        self.do_test("test_monkeypatch")
73
74    def test_open(self):
75        self.do_test("test_open", os_helper.TESTFN)
76
77    def test_cantrace(self):
78        self.do_test("test_cantrace")
79
80    def test_mmap(self):
81        self.do_test("test_mmap")
82
83    def test_excepthook(self):
84        returncode, events, stderr = self.run_python("test_excepthook")
85        if not returncode:
86            self.fail(f"Expected fatal exception\n{stderr}")
87
88        self.assertSequenceEqual(
89            [("sys.excepthook", " ", "RuntimeError('fatal-error')")], events
90        )
91
92    def test_unraisablehook(self):
93        returncode, events, stderr = self.run_python("test_unraisablehook")
94        if returncode:
95            self.fail(stderr)
96
97        self.assertEqual(events[0][0], "sys.unraisablehook")
98        self.assertEqual(
99            events[0][2],
100            "RuntimeError('nonfatal-error') Exception ignored for audit hook test",
101        )
102
103    def test_winreg(self):
104        import_helper.import_module("winreg")
105        returncode, events, stderr = self.run_python("test_winreg")
106        if returncode:
107            self.fail(stderr)
108
109        self.assertEqual(events[0][0], "winreg.OpenKey")
110        self.assertEqual(events[1][0], "winreg.OpenKey/result")
111        expected = events[1][2]
112        self.assertTrue(expected)
113        self.assertSequenceEqual(["winreg.EnumKey", " ", f"{expected} 0"], events[2])
114        self.assertSequenceEqual(["winreg.EnumKey", " ", f"{expected} 10000"], events[3])
115        self.assertSequenceEqual(["winreg.PyHKEY.Detach", " ", expected], events[4])
116
117    def test_socket(self):
118        import_helper.import_module("socket")
119        returncode, events, stderr = self.run_python("test_socket")
120        if returncode:
121            self.fail(stderr)
122
123        if support.verbose:
124            print(*events, sep='\n')
125        self.assertEqual(events[0][0], "socket.gethostname")
126        self.assertEqual(events[1][0], "socket.__new__")
127        self.assertEqual(events[2][0], "socket.bind")
128        self.assertTrue(events[2][2].endswith("('127.0.0.1', 8080)"))
129
130    def test_gc(self):
131        returncode, events, stderr = self.run_python("test_gc")
132        if returncode:
133            self.fail(stderr)
134
135        if support.verbose:
136            print(*events, sep='\n')
137        self.assertEqual(
138            [event[0] for event in events],
139            ["gc.get_objects", "gc.get_referrers", "gc.get_referents"]
140        )
141
142
143    def test_http(self):
144        import_helper.import_module("http.client")
145        returncode, events, stderr = self.run_python("test_http_client")
146        if returncode:
147            self.fail(stderr)
148
149        if support.verbose:
150            print(*events, sep='\n')
151        self.assertEqual(events[0][0], "http.client.connect")
152        self.assertEqual(events[0][2], "www.python.org 80")
153        self.assertEqual(events[1][0], "http.client.send")
154        if events[1][2] != '[cannot send]':
155            self.assertIn('HTTP', events[1][2])
156
157
158    def test_sqlite3(self):
159        sqlite3 = import_helper.import_module("sqlite3")
160        returncode, events, stderr = self.run_python("test_sqlite3")
161        if returncode:
162            self.fail(stderr)
163
164        if support.verbose:
165            print(*events, sep='\n')
166        actual = [ev[0] for ev in events]
167        expected = ["sqlite3.connect", "sqlite3.connect/handle"] * 2
168
169        if hasattr(sqlite3.Connection, "enable_load_extension"):
170            expected += [
171                "sqlite3.enable_load_extension",
172                "sqlite3.load_extension",
173            ]
174        self.assertEqual(actual, expected)
175
176
177    def test_sys_getframe(self):
178        returncode, events, stderr = self.run_python("test_sys_getframe")
179        if returncode:
180            self.fail(stderr)
181
182        if support.verbose:
183            print(*events, sep='\n')
184        actual = [(ev[0], ev[2]) for ev in events]
185        expected = [("sys._getframe", "test_sys_getframe")]
186
187        self.assertEqual(actual, expected)
188
189    def test_syslog(self):
190        syslog = import_helper.import_module("syslog")
191
192        returncode, events, stderr = self.run_python("test_syslog")
193        if returncode:
194            self.fail(stderr)
195
196        if support.verbose:
197            print('Events:', *events, sep='\n  ')
198
199        self.assertSequenceEqual(
200            events,
201            [('syslog.openlog', ' ', f'python 0 {syslog.LOG_USER}'),
202            ('syslog.syslog', ' ', f'{syslog.LOG_INFO} test'),
203            ('syslog.setlogmask', ' ', f'{syslog.LOG_DEBUG}'),
204            ('syslog.closelog', '', ''),
205            ('syslog.syslog', ' ', f'{syslog.LOG_INFO} test2'),
206            ('syslog.openlog', ' ', f'audit-tests.py 0 {syslog.LOG_USER}'),
207            ('syslog.openlog', ' ', f'audit-tests.py {syslog.LOG_NDELAY} {syslog.LOG_LOCAL0}'),
208            ('syslog.openlog', ' ', f'None 0 {syslog.LOG_USER}'),
209            ('syslog.closelog', '', '')]
210        )
211
212    def test_not_in_gc(self):
213        returncode, _, stderr = self.run_python("test_not_in_gc")
214        if returncode:
215            self.fail(stderr)
216
217
218if __name__ == "__main__":
219    unittest.main()
220