1"""Tests for sys.audit and sys.addaudithook 2""" 3 4import subprocess 5import sys 6import unittest 7from test import support 8from test.support import import_helper 9from test.support import os_helper 10 11 12if not hasattr(sys, "addaudithook") or not hasattr(sys, "audit"): 13 raise unittest.SkipTest("test only relevant when sys.audit is available") 14 15AUDIT_TESTS_PY = support.findfile("audit-tests.py") 16 17 18class AuditTest(unittest.TestCase): 19 maxDiff = None 20 21 @support.requires_subprocess() 22 def do_test(self, *args): 23 with subprocess.Popen( 24 [sys.executable, "-X utf8", AUDIT_TESTS_PY, *args], 25 encoding="utf-8", 26 stdout=subprocess.PIPE, 27 stderr=subprocess.PIPE, 28 ) as p: 29 p.wait() 30 sys.stdout.writelines(p.stdout) 31 sys.stderr.writelines(p.stderr) 32 if p.returncode: 33 self.fail("".join(p.stderr)) 34 35 @support.requires_subprocess() 36 def run_python(self, *args): 37 events = [] 38 with subprocess.Popen( 39 [sys.executable, "-X utf8", AUDIT_TESTS_PY, *args], 40 encoding="utf-8", 41 stdout=subprocess.PIPE, 42 stderr=subprocess.PIPE, 43 ) as p: 44 p.wait() 45 sys.stderr.writelines(p.stderr) 46 return ( 47 p.returncode, 48 [line.strip().partition(" ") for line in p.stdout], 49 "".join(p.stderr), 50 ) 51 52 def test_basic(self): 53 self.do_test("test_basic") 54 55 def test_block_add_hook(self): 56 self.do_test("test_block_add_hook") 57 58 def test_block_add_hook_baseexception(self): 59 self.do_test("test_block_add_hook_baseexception") 60 61 def test_marshal(self): 62 import_helper.import_module("marshal") 63 64 self.do_test("test_marshal") 65 66 def test_pickle(self): 67 import_helper.import_module("pickle") 68 69 self.do_test("test_pickle") 70 71 def test_monkeypatch(self): 72 self.do_test("test_monkeypatch") 73 74 def test_open(self): 75 self.do_test("test_open", os_helper.TESTFN) 76 77 def test_cantrace(self): 78 self.do_test("test_cantrace") 79 80 def test_mmap(self): 81 self.do_test("test_mmap") 82 83 def test_excepthook(self): 84 returncode, events, stderr = self.run_python("test_excepthook") 85 if not returncode: 86 self.fail(f"Expected fatal exception\n{stderr}") 87 88 self.assertSequenceEqual( 89 [("sys.excepthook", " ", "RuntimeError('fatal-error')")], events 90 ) 91 92 def test_unraisablehook(self): 93 returncode, events, stderr = self.run_python("test_unraisablehook") 94 if returncode: 95 self.fail(stderr) 96 97 self.assertEqual(events[0][0], "sys.unraisablehook") 98 self.assertEqual( 99 events[0][2], 100 "RuntimeError('nonfatal-error') Exception ignored for audit hook test", 101 ) 102 103 def test_winreg(self): 104 import_helper.import_module("winreg") 105 returncode, events, stderr = self.run_python("test_winreg") 106 if returncode: 107 self.fail(stderr) 108 109 self.assertEqual(events[0][0], "winreg.OpenKey") 110 self.assertEqual(events[1][0], "winreg.OpenKey/result") 111 expected = events[1][2] 112 self.assertTrue(expected) 113 self.assertSequenceEqual(["winreg.EnumKey", " ", f"{expected} 0"], events[2]) 114 self.assertSequenceEqual(["winreg.EnumKey", " ", f"{expected} 10000"], events[3]) 115 self.assertSequenceEqual(["winreg.PyHKEY.Detach", " ", expected], events[4]) 116 117 def test_socket(self): 118 import_helper.import_module("socket") 119 returncode, events, stderr = self.run_python("test_socket") 120 if returncode: 121 self.fail(stderr) 122 123 if support.verbose: 124 print(*events, sep='\n') 125 self.assertEqual(events[0][0], "socket.gethostname") 126 self.assertEqual(events[1][0], "socket.__new__") 127 self.assertEqual(events[2][0], "socket.bind") 128 self.assertTrue(events[2][2].endswith("('127.0.0.1', 8080)")) 129 130 def test_gc(self): 131 returncode, events, stderr = self.run_python("test_gc") 132 if returncode: 133 self.fail(stderr) 134 135 if support.verbose: 136 print(*events, sep='\n') 137 self.assertEqual( 138 [event[0] for event in events], 139 ["gc.get_objects", "gc.get_referrers", "gc.get_referents"] 140 ) 141 142 143 def test_http(self): 144 import_helper.import_module("http.client") 145 returncode, events, stderr = self.run_python("test_http_client") 146 if returncode: 147 self.fail(stderr) 148 149 if support.verbose: 150 print(*events, sep='\n') 151 self.assertEqual(events[0][0], "http.client.connect") 152 self.assertEqual(events[0][2], "www.python.org 80") 153 self.assertEqual(events[1][0], "http.client.send") 154 if events[1][2] != '[cannot send]': 155 self.assertIn('HTTP', events[1][2]) 156 157 158 def test_sqlite3(self): 159 sqlite3 = import_helper.import_module("sqlite3") 160 returncode, events, stderr = self.run_python("test_sqlite3") 161 if returncode: 162 self.fail(stderr) 163 164 if support.verbose: 165 print(*events, sep='\n') 166 actual = [ev[0] for ev in events] 167 expected = ["sqlite3.connect", "sqlite3.connect/handle"] * 2 168 169 if hasattr(sqlite3.Connection, "enable_load_extension"): 170 expected += [ 171 "sqlite3.enable_load_extension", 172 "sqlite3.load_extension", 173 ] 174 self.assertEqual(actual, expected) 175 176 177 def test_sys_getframe(self): 178 returncode, events, stderr = self.run_python("test_sys_getframe") 179 if returncode: 180 self.fail(stderr) 181 182 if support.verbose: 183 print(*events, sep='\n') 184 actual = [(ev[0], ev[2]) for ev in events] 185 expected = [("sys._getframe", "test_sys_getframe")] 186 187 self.assertEqual(actual, expected) 188 189 def test_syslog(self): 190 syslog = import_helper.import_module("syslog") 191 192 returncode, events, stderr = self.run_python("test_syslog") 193 if returncode: 194 self.fail(stderr) 195 196 if support.verbose: 197 print('Events:', *events, sep='\n ') 198 199 self.assertSequenceEqual( 200 events, 201 [('syslog.openlog', ' ', f'python 0 {syslog.LOG_USER}'), 202 ('syslog.syslog', ' ', f'{syslog.LOG_INFO} test'), 203 ('syslog.setlogmask', ' ', f'{syslog.LOG_DEBUG}'), 204 ('syslog.closelog', '', ''), 205 ('syslog.syslog', ' ', f'{syslog.LOG_INFO} test2'), 206 ('syslog.openlog', ' ', f'audit-tests.py 0 {syslog.LOG_USER}'), 207 ('syslog.openlog', ' ', f'audit-tests.py {syslog.LOG_NDELAY} {syslog.LOG_LOCAL0}'), 208 ('syslog.openlog', ' ', f'None 0 {syslog.LOG_USER}'), 209 ('syslog.closelog', '', '')] 210 ) 211 212 def test_not_in_gc(self): 213 returncode, _, stderr = self.run_python("test_not_in_gc") 214 if returncode: 215 self.fail(stderr) 216 217 218if __name__ == "__main__": 219 unittest.main() 220