1import dis
2import os.path
3import re
4import subprocess
5import sys
6import types
7import unittest
8
9from test import support
10from test.support import findfile
11
12
13if not support.has_subprocess_support:
14    raise unittest.SkipTest("test module requires subprocess")
15
16
17def abspath(filename):
18    return os.path.abspath(findfile(filename, subdir="dtracedata"))
19
20
21def normalize_trace_output(output):
22    """Normalize DTrace output for comparison.
23
24    DTrace keeps a per-CPU buffer, and when showing the fired probes, buffers
25    are concatenated. So if the operating system moves our thread around, the
26    straight result can be "non-causal". So we add timestamps to the probe
27    firing, sort by that field, then strip it from the output"""
28
29    # When compiling with '--with-pydebug', strip '[# refs]' debug output.
30    output = re.sub(r"\[[0-9]+ refs\]", "", output)
31    try:
32        result = [
33            row.split("\t")
34            for row in output.splitlines()
35            if row and not row.startswith('#')
36        ]
37        result.sort(key=lambda row: int(row[0]))
38        result = [row[1] for row in result]
39        return "\n".join(result)
40    except (IndexError, ValueError):
41        raise AssertionError(
42            "tracer produced unparsable output:\n{}".format(output)
43        )
44
45
46class TraceBackend:
47    EXTENSION = None
48    COMMAND = None
49    COMMAND_ARGS = []
50
51    def run_case(self, name, optimize_python=None):
52        actual_output = normalize_trace_output(self.trace_python(
53            script_file=abspath(name + self.EXTENSION),
54            python_file=abspath(name + ".py"),
55            optimize_python=optimize_python))
56
57        with open(abspath(name + self.EXTENSION + ".expected")) as f:
58            expected_output = f.read().rstrip()
59
60        return (expected_output, actual_output)
61
62    def generate_trace_command(self, script_file, subcommand=None):
63        command = self.COMMAND + [script_file]
64        if subcommand:
65            command += ["-c", subcommand]
66        return command
67
68    def trace(self, script_file, subcommand=None):
69        command = self.generate_trace_command(script_file, subcommand)
70        stdout, _ = subprocess.Popen(command,
71                                     stdout=subprocess.PIPE,
72                                     stderr=subprocess.STDOUT,
73                                     universal_newlines=True).communicate()
74        return stdout
75
76    def trace_python(self, script_file, python_file, optimize_python=None):
77        python_flags = []
78        if optimize_python:
79            python_flags.extend(["-O"] * optimize_python)
80        subcommand = " ".join([sys.executable] + python_flags + [python_file])
81        return self.trace(script_file, subcommand)
82
83    def assert_usable(self):
84        try:
85            output = self.trace(abspath("assert_usable" + self.EXTENSION))
86            output = output.strip()
87        except (FileNotFoundError, NotADirectoryError, PermissionError) as fnfe:
88            output = str(fnfe)
89        if output != "probe: success":
90            raise unittest.SkipTest(
91                "{}(1) failed: {}".format(self.COMMAND[0], output)
92            )
93
94
95class DTraceBackend(TraceBackend):
96    EXTENSION = ".d"
97    COMMAND = ["dtrace", "-q", "-s"]
98
99
100class SystemTapBackend(TraceBackend):
101    EXTENSION = ".stp"
102    COMMAND = ["stap", "-g"]
103
104
105class TraceTests:
106    # unittest.TestCase options
107    maxDiff = None
108
109    # TraceTests options
110    backend = None
111    optimize_python = 0
112
113    @classmethod
114    def setUpClass(self):
115        self.backend.assert_usable()
116
117    def run_case(self, name):
118        actual_output, expected_output = self.backend.run_case(
119            name, optimize_python=self.optimize_python)
120        self.assertEqual(actual_output, expected_output)
121
122    def test_function_entry_return(self):
123        self.run_case("call_stack")
124
125    def test_verify_call_opcodes(self):
126        """Ensure our call stack test hits all function call opcodes"""
127
128        opcodes = set(["CALL_FUNCTION", "CALL_FUNCTION_EX", "CALL_FUNCTION_KW"])
129
130        with open(abspath("call_stack.py")) as f:
131            code_string = f.read()
132
133        def get_function_instructions(funcname):
134            # Recompile with appropriate optimization setting
135            code = compile(source=code_string,
136                           filename="<string>",
137                           mode="exec",
138                           optimize=self.optimize_python)
139
140            for c in code.co_consts:
141                if isinstance(c, types.CodeType) and c.co_name == funcname:
142                    return dis.get_instructions(c)
143            return []
144
145        for instruction in get_function_instructions('start'):
146            opcodes.discard(instruction.opname)
147
148        self.assertEqual(set(), opcodes)
149
150    def test_gc(self):
151        self.run_case("gc")
152
153    def test_line(self):
154        self.run_case("line")
155
156
157class DTraceNormalTests(TraceTests, unittest.TestCase):
158    backend = DTraceBackend()
159    optimize_python = 0
160
161
162class DTraceOptimizedTests(TraceTests, unittest.TestCase):
163    backend = DTraceBackend()
164    optimize_python = 2
165
166
167class SystemTapNormalTests(TraceTests, unittest.TestCase):
168    backend = SystemTapBackend()
169    optimize_python = 0
170
171
172class SystemTapOptimizedTests(TraceTests, unittest.TestCase):
173    backend = SystemTapBackend()
174    optimize_python = 2
175
176
177if __name__ == '__main__':
178    unittest.main()
179