1"""Running tests"""
2
3import sys
4import time
5import warnings
6
7from . import result
8from .case import _SubTest
9from .signals import registerResult
10
11__unittest = True
12
13
14class _WritelnDecorator(object):
15    """Used to decorate file-like objects with a handy 'writeln' method"""
16    def __init__(self,stream):
17        self.stream = stream
18
19    def __getattr__(self, attr):
20        if attr in ('stream', '__getstate__'):
21            raise AttributeError(attr)
22        return getattr(self.stream,attr)
23
24    def writeln(self, arg=None):
25        if arg:
26            self.write(arg)
27        self.write('\n') # text-mode streams translate to \r\n if needed
28
29
30class TextTestResult(result.TestResult):
31    """A test result class that can print formatted text results to a stream.
32
33    Used by TextTestRunner.
34    """
35    separator1 = '=' * 70
36    separator2 = '-' * 70
37
38    def __init__(self, stream, descriptions, verbosity):
39        super(TextTestResult, self).__init__(stream, descriptions, verbosity)
40        self.stream = stream
41        self.showAll = verbosity > 1
42        self.dots = verbosity == 1
43        self.descriptions = descriptions
44        self._newline = True
45
46    def getDescription(self, test):
47        doc_first_line = test.shortDescription()
48        if self.descriptions and doc_first_line:
49            return '\n'.join((str(test), doc_first_line))
50        else:
51            return str(test)
52
53    def startTest(self, test):
54        super(TextTestResult, self).startTest(test)
55        if self.showAll:
56            self.stream.write(self.getDescription(test))
57            self.stream.write(" ... ")
58            self.stream.flush()
59            self._newline = False
60
61    def _write_status(self, test, status):
62        is_subtest = isinstance(test, _SubTest)
63        if is_subtest or self._newline:
64            if not self._newline:
65                self.stream.writeln()
66            if is_subtest:
67                self.stream.write("  ")
68            self.stream.write(self.getDescription(test))
69            self.stream.write(" ... ")
70        self.stream.writeln(status)
71        self.stream.flush()
72        self._newline = True
73
74    def addSubTest(self, test, subtest, err):
75        if err is not None:
76            if self.showAll:
77                if issubclass(err[0], subtest.failureException):
78                    self._write_status(subtest, "FAIL")
79                else:
80                    self._write_status(subtest, "ERROR")
81            elif self.dots:
82                if issubclass(err[0], subtest.failureException):
83                    self.stream.write('F')
84                else:
85                    self.stream.write('E')
86                self.stream.flush()
87        super(TextTestResult, self).addSubTest(test, subtest, err)
88
89    def addSuccess(self, test):
90        super(TextTestResult, self).addSuccess(test)
91        if self.showAll:
92            self._write_status(test, "ok")
93        elif self.dots:
94            self.stream.write('.')
95            self.stream.flush()
96
97    def addError(self, test, err):
98        super(TextTestResult, self).addError(test, err)
99        if self.showAll:
100            self._write_status(test, "ERROR")
101        elif self.dots:
102            self.stream.write('E')
103            self.stream.flush()
104
105    def addFailure(self, test, err):
106        super(TextTestResult, self).addFailure(test, err)
107        if self.showAll:
108            self._write_status(test, "FAIL")
109        elif self.dots:
110            self.stream.write('F')
111            self.stream.flush()
112
113    def addSkip(self, test, reason):
114        super(TextTestResult, self).addSkip(test, reason)
115        if self.showAll:
116            self._write_status(test, "skipped {0!r}".format(reason))
117        elif self.dots:
118            self.stream.write("s")
119            self.stream.flush()
120
121    def addExpectedFailure(self, test, err):
122        super(TextTestResult, self).addExpectedFailure(test, err)
123        if self.showAll:
124            self.stream.writeln("expected failure")
125            self.stream.flush()
126        elif self.dots:
127            self.stream.write("x")
128            self.stream.flush()
129
130    def addUnexpectedSuccess(self, test):
131        super(TextTestResult, self).addUnexpectedSuccess(test)
132        if self.showAll:
133            self.stream.writeln("unexpected success")
134            self.stream.flush()
135        elif self.dots:
136            self.stream.write("u")
137            self.stream.flush()
138
139    def printErrors(self):
140        if self.dots or self.showAll:
141            self.stream.writeln()
142            self.stream.flush()
143        self.printErrorList('ERROR', self.errors)
144        self.printErrorList('FAIL', self.failures)
145        unexpectedSuccesses = getattr(self, 'unexpectedSuccesses', ())
146        if unexpectedSuccesses:
147            self.stream.writeln(self.separator1)
148            for test in unexpectedSuccesses:
149                self.stream.writeln(f"UNEXPECTED SUCCESS: {self.getDescription(test)}")
150            self.stream.flush()
151
152    def printErrorList(self, flavour, errors):
153        for test, err in errors:
154            self.stream.writeln(self.separator1)
155            self.stream.writeln("%s: %s" % (flavour,self.getDescription(test)))
156            self.stream.writeln(self.separator2)
157            self.stream.writeln("%s" % err)
158            self.stream.flush()
159
160
161class TextTestRunner(object):
162    """A test runner class that displays results in textual form.
163
164    It prints out the names of tests as they are run, errors as they
165    occur, and a summary of the results at the end of the test run.
166    """
167    resultclass = TextTestResult
168
169    def __init__(self, stream=None, descriptions=True, verbosity=1,
170                 failfast=False, buffer=False, resultclass=None, warnings=None,
171                 *, tb_locals=False):
172        """Construct a TextTestRunner.
173
174        Subclasses should accept **kwargs to ensure compatibility as the
175        interface changes.
176        """
177        if stream is None:
178            stream = sys.stderr
179        self.stream = _WritelnDecorator(stream)
180        self.descriptions = descriptions
181        self.verbosity = verbosity
182        self.failfast = failfast
183        self.buffer = buffer
184        self.tb_locals = tb_locals
185        self.warnings = warnings
186        if resultclass is not None:
187            self.resultclass = resultclass
188
189    def _makeResult(self):
190        return self.resultclass(self.stream, self.descriptions, self.verbosity)
191
192    def run(self, test):
193        "Run the given test case or test suite."
194        result = self._makeResult()
195        registerResult(result)
196        result.failfast = self.failfast
197        result.buffer = self.buffer
198        result.tb_locals = self.tb_locals
199        with warnings.catch_warnings():
200            if self.warnings:
201                # if self.warnings is set, use it to filter all the warnings
202                warnings.simplefilter(self.warnings)
203                # if the filter is 'default' or 'always', special-case the
204                # warnings from the deprecated unittest methods to show them
205                # no more than once per module, because they can be fairly
206                # noisy.  The -Wd and -Wa flags can be used to bypass this
207                # only when self.warnings is None.
208                if self.warnings in ['default', 'always']:
209                    warnings.filterwarnings('module',
210                            category=DeprecationWarning,
211                            message=r'Please use assert\w+ instead.')
212            startTime = time.perf_counter()
213            startTestRun = getattr(result, 'startTestRun', None)
214            if startTestRun is not None:
215                startTestRun()
216            try:
217                test(result)
218            finally:
219                stopTestRun = getattr(result, 'stopTestRun', None)
220                if stopTestRun is not None:
221                    stopTestRun()
222            stopTime = time.perf_counter()
223        timeTaken = stopTime - startTime
224        result.printErrors()
225        if hasattr(result, 'separator2'):
226            self.stream.writeln(result.separator2)
227        run = result.testsRun
228        self.stream.writeln("Ran %d test%s in %.3fs" %
229                            (run, run != 1 and "s" or "", timeTaken))
230        self.stream.writeln()
231
232        expectedFails = unexpectedSuccesses = skipped = 0
233        try:
234            results = map(len, (result.expectedFailures,
235                                result.unexpectedSuccesses,
236                                result.skipped))
237        except AttributeError:
238            pass
239        else:
240            expectedFails, unexpectedSuccesses, skipped = results
241
242        infos = []
243        if not result.wasSuccessful():
244            self.stream.write("FAILED")
245            failed, errored = len(result.failures), len(result.errors)
246            if failed:
247                infos.append("failures=%d" % failed)
248            if errored:
249                infos.append("errors=%d" % errored)
250        else:
251            self.stream.write("OK")
252        if skipped:
253            infos.append("skipped=%d" % skipped)
254        if expectedFails:
255            infos.append("expected failures=%d" % expectedFails)
256        if unexpectedSuccesses:
257            infos.append("unexpected successes=%d" % unexpectedSuccesses)
258        if infos:
259            self.stream.writeln(" (%s)" % (", ".join(infos),))
260        else:
261            self.stream.write("\n")
262        self.stream.flush()
263        return result
264