1"""Test result object"""
2
3import io
4import sys
5import traceback
6
7from . import util
8from functools import wraps
9
10__unittest = True
11
12def failfast(method):
13    @wraps(method)
14    def inner(self, *args, **kw):
15        if getattr(self, 'failfast', False):
16            self.stop()
17        return method(self, *args, **kw)
18    return inner
19
20STDOUT_LINE = '\nStdout:\n%s'
21STDERR_LINE = '\nStderr:\n%s'
22
23
24class TestResult(object):
25    """Holder for test result information.
26
27    Test results are automatically managed by the TestCase and TestSuite
28    classes, and do not need to be explicitly manipulated by writers of tests.
29
30    Each instance holds the total number of tests run, and collections of
31    failures and errors that occurred among those test runs. The collections
32    contain tuples of (testcase, exceptioninfo), where exceptioninfo is the
33    formatted traceback of the error that occurred.
34    """
35    _previousTestClass = None
36    _testRunEntered = False
37    _moduleSetUpFailed = False
38    def __init__(self, stream=None, descriptions=None, verbosity=None):
39        self.failfast = False
40        self.failures = []
41        self.errors = []
42        self.testsRun = 0
43        self.skipped = []
44        self.expectedFailures = []
45        self.unexpectedSuccesses = []
46        self.shouldStop = False
47        self.buffer = False
48        self.tb_locals = False
49        self._stdout_buffer = None
50        self._stderr_buffer = None
51        self._original_stdout = sys.stdout
52        self._original_stderr = sys.stderr
53        self._mirrorOutput = False
54
55    def printErrors(self):
56        "Called by TestRunner after test run"
57
58    def startTest(self, test):
59        "Called when the given test is about to be run"
60        self.testsRun += 1
61        self._mirrorOutput = False
62        self._setupStdout()
63
64    def _setupStdout(self):
65        if self.buffer:
66            if self._stderr_buffer is None:
67                self._stderr_buffer = io.StringIO()
68                self._stdout_buffer = io.StringIO()
69            sys.stdout = self._stdout_buffer
70            sys.stderr = self._stderr_buffer
71
72    def startTestRun(self):
73        """Called once before any tests are executed.
74
75        See startTest for a method called before each test.
76        """
77
78    def stopTest(self, test):
79        """Called when the given test has been run"""
80        self._restoreStdout()
81        self._mirrorOutput = False
82
83    def _restoreStdout(self):
84        if self.buffer:
85            if self._mirrorOutput:
86                output = sys.stdout.getvalue()
87                error = sys.stderr.getvalue()
88                if output:
89                    if not output.endswith('\n'):
90                        output += '\n'
91                    self._original_stdout.write(STDOUT_LINE % output)
92                if error:
93                    if not error.endswith('\n'):
94                        error += '\n'
95                    self._original_stderr.write(STDERR_LINE % error)
96
97            sys.stdout = self._original_stdout
98            sys.stderr = self._original_stderr
99            self._stdout_buffer.seek(0)
100            self._stdout_buffer.truncate()
101            self._stderr_buffer.seek(0)
102            self._stderr_buffer.truncate()
103
104    def stopTestRun(self):
105        """Called once after all tests are executed.
106
107        See stopTest for a method called after each test.
108        """
109
110    @failfast
111    def addError(self, test, err):
112        """Called when an error has occurred. 'err' is a tuple of values as
113        returned by sys.exc_info().
114        """
115        self.errors.append((test, self._exc_info_to_string(err, test)))
116        self._mirrorOutput = True
117
118    @failfast
119    def addFailure(self, test, err):
120        """Called when an error has occurred. 'err' is a tuple of values as
121        returned by sys.exc_info()."""
122        self.failures.append((test, self._exc_info_to_string(err, test)))
123        self._mirrorOutput = True
124
125    def addSubTest(self, test, subtest, err):
126        """Called at the end of a subtest.
127        'err' is None if the subtest ended successfully, otherwise it's a
128        tuple of values as returned by sys.exc_info().
129        """
130        # By default, we don't do anything with successful subtests, but
131        # more sophisticated test results might want to record them.
132        if err is not None:
133            if getattr(self, 'failfast', False):
134                self.stop()
135            if issubclass(err[0], test.failureException):
136                errors = self.failures
137            else:
138                errors = self.errors
139            errors.append((subtest, self._exc_info_to_string(err, test)))
140            self._mirrorOutput = True
141
142    def addSuccess(self, test):
143        "Called when a test has completed successfully"
144        pass
145
146    def addSkip(self, test, reason):
147        """Called when a test is skipped."""
148        self.skipped.append((test, reason))
149
150    def addExpectedFailure(self, test, err):
151        """Called when an expected failure/error occurred."""
152        self.expectedFailures.append(
153            (test, self._exc_info_to_string(err, test)))
154
155    @failfast
156    def addUnexpectedSuccess(self, test):
157        """Called when a test was expected to fail, but succeed."""
158        self.unexpectedSuccesses.append(test)
159
160    def wasSuccessful(self):
161        """Tells whether or not this result was a success."""
162        # The hasattr check is for test_result's OldResult test.  That
163        # way this method works on objects that lack the attribute.
164        # (where would such result instances come from? old stored pickles?)
165        return ((len(self.failures) == len(self.errors) == 0) and
166                (not hasattr(self, 'unexpectedSuccesses') or
167                 len(self.unexpectedSuccesses) == 0))
168
169    def stop(self):
170        """Indicates that the tests should be aborted."""
171        self.shouldStop = True
172
173    def _exc_info_to_string(self, err, test):
174        """Converts a sys.exc_info()-style tuple of values into a string."""
175        exctype, value, tb = err
176        tb = self._clean_tracebacks(exctype, value, tb, test)
177        tb_e = traceback.TracebackException(
178            exctype, value, tb,
179            capture_locals=self.tb_locals, compact=True)
180        msgLines = list(tb_e.format())
181
182        if self.buffer:
183            output = sys.stdout.getvalue()
184            error = sys.stderr.getvalue()
185            if output:
186                if not output.endswith('\n'):
187                    output += '\n'
188                msgLines.append(STDOUT_LINE % output)
189            if error:
190                if not error.endswith('\n'):
191                    error += '\n'
192                msgLines.append(STDERR_LINE % error)
193        return ''.join(msgLines)
194
195    def _clean_tracebacks(self, exctype, value, tb, test):
196        ret = None
197        first = True
198        excs = [(exctype, value, tb)]
199        seen = {id(value)}  # Detect loops in chained exceptions.
200        while excs:
201            (exctype, value, tb) = excs.pop()
202            # Skip test runner traceback levels
203            while tb and self._is_relevant_tb_level(tb):
204                tb = tb.tb_next
205
206            # Skip assert*() traceback levels
207            if exctype is test.failureException:
208                self._remove_unittest_tb_frames(tb)
209
210            if first:
211                ret = tb
212                first = False
213            else:
214                value.__traceback__ = tb
215
216            if value is not None:
217                for c in (value.__cause__, value.__context__):
218                    if c is not None and id(c) not in seen:
219                        excs.append((type(c), c, c.__traceback__))
220                        seen.add(id(c))
221        return ret
222
223    def _is_relevant_tb_level(self, tb):
224        return '__unittest' in tb.tb_frame.f_globals
225
226    def _remove_unittest_tb_frames(self, tb):
227        '''Truncates usercode tb at the first unittest frame.
228
229        If the first frame of the traceback is in user code,
230        the prefix up to the first unittest frame is returned.
231        If the first frame is already in the unittest module,
232        the traceback is not modified.
233        '''
234        prev = None
235        while tb and not self._is_relevant_tb_level(tb):
236            prev = tb
237            tb = tb.tb_next
238        if prev is not None:
239            prev.tb_next = None
240
241    def __repr__(self):
242        return ("<%s run=%i errors=%i failures=%i>" %
243               (util.strclass(self.__class__), self.testsRun, len(self.errors),
244                len(self.failures)))
245