1import dis 2import os.path 3import re 4import subprocess 5import sys 6import types 7import unittest 8 9from test import support 10from test.support import findfile 11 12 13if not support.has_subprocess_support: 14 raise unittest.SkipTest("test module requires subprocess") 15 16 17def abspath(filename): 18 return os.path.abspath(findfile(filename, subdir="dtracedata")) 19 20 21def normalize_trace_output(output): 22 """Normalize DTrace output for comparison. 23 24 DTrace keeps a per-CPU buffer, and when showing the fired probes, buffers 25 are concatenated. So if the operating system moves our thread around, the 26 straight result can be "non-causal". So we add timestamps to the probe 27 firing, sort by that field, then strip it from the output""" 28 29 # When compiling with '--with-pydebug', strip '[# refs]' debug output. 30 output = re.sub(r"\[[0-9]+ refs\]", "", output) 31 try: 32 result = [ 33 row.split("\t") 34 for row in output.splitlines() 35 if row and not row.startswith('#') 36 ] 37 result.sort(key=lambda row: int(row[0])) 38 result = [row[1] for row in result] 39 return "\n".join(result) 40 except (IndexError, ValueError): 41 raise AssertionError( 42 "tracer produced unparsable output:\n{}".format(output) 43 ) 44 45 46class TraceBackend: 47 EXTENSION = None 48 COMMAND = None 49 COMMAND_ARGS = [] 50 51 def run_case(self, name, optimize_python=None): 52 actual_output = normalize_trace_output(self.trace_python( 53 script_file=abspath(name + self.EXTENSION), 54 python_file=abspath(name + ".py"), 55 optimize_python=optimize_python)) 56 57 with open(abspath(name + self.EXTENSION + ".expected")) as f: 58 expected_output = f.read().rstrip() 59 60 return (expected_output, actual_output) 61 62 def generate_trace_command(self, script_file, subcommand=None): 63 command = self.COMMAND + [script_file] 64 if subcommand: 65 command += ["-c", subcommand] 66 return command 67 68 def trace(self, script_file, subcommand=None): 69 command = self.generate_trace_command(script_file, subcommand) 70 stdout, _ = subprocess.Popen(command, 71 stdout=subprocess.PIPE, 72 stderr=subprocess.STDOUT, 73 universal_newlines=True).communicate() 74 return stdout 75 76 def trace_python(self, script_file, python_file, optimize_python=None): 77 python_flags = [] 78 if optimize_python: 79 python_flags.extend(["-O"] * optimize_python) 80 subcommand = " ".join([sys.executable] + python_flags + [python_file]) 81 return self.trace(script_file, subcommand) 82 83 def assert_usable(self): 84 try: 85 output = self.trace(abspath("assert_usable" + self.EXTENSION)) 86 output = output.strip() 87 except (FileNotFoundError, NotADirectoryError, PermissionError) as fnfe: 88 output = str(fnfe) 89 if output != "probe: success": 90 raise unittest.SkipTest( 91 "{}(1) failed: {}".format(self.COMMAND[0], output) 92 ) 93 94 95class DTraceBackend(TraceBackend): 96 EXTENSION = ".d" 97 COMMAND = ["dtrace", "-q", "-s"] 98 99 100class SystemTapBackend(TraceBackend): 101 EXTENSION = ".stp" 102 COMMAND = ["stap", "-g"] 103 104 105class TraceTests: 106 # unittest.TestCase options 107 maxDiff = None 108 109 # TraceTests options 110 backend = None 111 optimize_python = 0 112 113 @classmethod 114 def setUpClass(self): 115 self.backend.assert_usable() 116 117 def run_case(self, name): 118 actual_output, expected_output = self.backend.run_case( 119 name, optimize_python=self.optimize_python) 120 self.assertEqual(actual_output, expected_output) 121 122 def test_function_entry_return(self): 123 self.run_case("call_stack") 124 125 def test_verify_call_opcodes(self): 126 """Ensure our call stack test hits all function call opcodes""" 127 128 opcodes = set(["CALL_FUNCTION", "CALL_FUNCTION_EX", "CALL_FUNCTION_KW"]) 129 130 with open(abspath("call_stack.py")) as f: 131 code_string = f.read() 132 133 def get_function_instructions(funcname): 134 # Recompile with appropriate optimization setting 135 code = compile(source=code_string, 136 filename="<string>", 137 mode="exec", 138 optimize=self.optimize_python) 139 140 for c in code.co_consts: 141 if isinstance(c, types.CodeType) and c.co_name == funcname: 142 return dis.get_instructions(c) 143 return [] 144 145 for instruction in get_function_instructions('start'): 146 opcodes.discard(instruction.opname) 147 148 self.assertEqual(set(), opcodes) 149 150 def test_gc(self): 151 self.run_case("gc") 152 153 def test_line(self): 154 self.run_case("line") 155 156 157class DTraceNormalTests(TraceTests, unittest.TestCase): 158 backend = DTraceBackend() 159 optimize_python = 0 160 161 162class DTraceOptimizedTests(TraceTests, unittest.TestCase): 163 backend = DTraceBackend() 164 optimize_python = 2 165 166 167class SystemTapNormalTests(TraceTests, unittest.TestCase): 168 backend = SystemTapBackend() 169 optimize_python = 0 170 171 172class SystemTapOptimizedTests(TraceTests, unittest.TestCase): 173 backend = SystemTapBackend() 174 optimize_python = 2 175 176 177if __name__ == '__main__': 178 unittest.main() 179