xref: /aosp_15_r20/tools/asuite/atest/test_runners/event_handler.py (revision c2e18aaa1096c836b086f94603d04f4eb9cf37f5)
1# Copyright 2019, The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Atest test event handler class."""
16
17from __future__ import print_function
18
19from collections import deque
20from datetime import timedelta
21import logging
22import time
23
24from atest import atest_execution_info
25from atest import atest_utils
26from atest import result_reporter
27from atest.test_runners import test_runner_base
28
29
30EVENT_NAMES = {
31    'module_started': 'TEST_MODULE_STARTED',
32    'module_ended': 'TEST_MODULE_ENDED',
33    'run_started': 'TEST_RUN_STARTED',
34    'run_ended': 'TEST_RUN_ENDED',
35    # Next three are test-level events
36    'test_started': 'TEST_STARTED',
37    'test_failed': 'TEST_FAILED',
38    'test_ended': 'TEST_ENDED',
39    'invocation_ended': 'INVOCATION_ENDED',
40    # Last two failures are runner-level, not test-level.
41    # Invocation failure is broader than run failure.
42    'run_failed': 'TEST_RUN_FAILED',
43    'invocation_failed': 'INVOCATION_FAILED',
44    'test_ignored': 'TEST_IGNORED',
45    'test_assumption_failure': 'TEST_ASSUMPTION_FAILURE',
46    'log_association': 'LOG_ASSOCIATION',
47}
48
49EVENT_PAIRS = {
50    EVENT_NAMES['module_started']: EVENT_NAMES['module_ended'],
51    EVENT_NAMES['run_started']: EVENT_NAMES['run_ended'],
52    EVENT_NAMES['test_started']: EVENT_NAMES['test_ended'],
53}
54START_EVENTS = list(EVENT_PAIRS.keys())
55END_EVENTS = list(EVENT_PAIRS.values())
56TEST_NAME_TEMPLATE = '%s#%s'
57
58# time in millisecond.
59ONE_SECOND = 1000
60ONE_MINUTE = 60000
61ONE_HOUR = 3600000
62
63CONNECTION_STATE = {
64    'current_test': None,
65    'test_run_name': None,
66    'last_failed': None,
67    'last_ignored': None,
68    'last_assumption_failed': None,
69    'current_group': None,
70    'current_group_total': None,
71    'test_count': 0,
72    'test_start_time': None,
73}
74
75
76class EventHandleError(Exception):
77  """Raised when handle event error."""
78
79
80class EventHandler:
81  """Test Event handle class."""
82
83  def __init__(self, reporter, name):
84    self.reporter = reporter
85    self.runner_name = name
86    self.state = CONNECTION_STATE.copy()
87    self.event_stack = deque()
88    self.log_associations = []
89    self.run_num = 0
90
91  def _module_started(self, event_data):
92    if atest_execution_info.PREPARE_END_TIME is None:
93      atest_execution_info.PREPARE_END_TIME = time.time()
94    self.state['current_group'] = event_data['moduleName']
95    self.state['last_failed'] = None
96    self.state['current_test'] = None
97
98  def _run_started(self, event_data):
99    # Technically there can be more than one run per module.
100    self.run_num = event_data.get('runAttempt', 0)
101    self.state['test_run_name'] = event_data.setdefault('runName', '')
102    self.state['current_group_total'] = event_data['testCount']
103    self.state['test_count'] = 0
104    self.state['last_failed'] = None
105    self.state['current_test'] = None
106
107  def _test_started(self, event_data):
108    name = TEST_NAME_TEMPLATE % (
109        event_data['className'],
110        event_data['testName'],
111    )
112    self.state['current_test'] = name
113    self.state['test_count'] += 1
114    self.state['test_start_time'] = event_data['start_time']
115
116  def _test_failed(self, event_data):
117    self.state['last_failed'] = {
118        'name': TEST_NAME_TEMPLATE % (
119            event_data['className'],
120            event_data['testName'],
121        ),
122        'trace': event_data['trace'],
123    }
124
125  def _test_ignored(self, event_data):
126    name = TEST_NAME_TEMPLATE % (
127        event_data['className'],
128        event_data['testName'],
129    )
130    self.state['last_ignored'] = name
131
132  def _test_assumption_failure(self, event_data):
133    name = TEST_NAME_TEMPLATE % (
134        event_data['className'],
135        event_data['testName'],
136    )
137    self.state['last_assumption_failed'] = name
138
139  def _run_failed(self, event_data):
140    # Module and Test Run probably started, but failure occurred.
141    self.reporter.process_test_result(
142        test_runner_base.TestResult(
143            runner_name=self.runner_name,
144            group_name=self.state['current_group'],
145            test_name=self.state['current_test'],
146            status=test_runner_base.ERROR_STATUS,
147            details=event_data['reason'],
148            test_count=self.state['test_count'],
149            test_time='',
150            runner_total=None,
151            group_total=self.state['current_group_total'],
152            additional_info={},
153            test_run_name=self.state['test_run_name'],
154        )
155    )
156
157  def _invocation_failed(self, event_data):
158    # Broadest possible failure. May not even start the module/test run.
159    self.reporter.process_test_result(
160        test_runner_base.TestResult(
161            runner_name=self.runner_name,
162            group_name=self.state['current_group'],
163            test_name=self.state['current_test'],
164            status=test_runner_base.ERROR_STATUS,
165            details=event_data['cause'],
166            test_count=self.state['test_count'],
167            test_time='',
168            runner_total=None,
169            group_total=self.state['current_group_total'],
170            additional_info={},
171            test_run_name=self.state['test_run_name'],
172        )
173    )
174
175  def _invocation_ended(self, event_data):
176    self.reporter.device_count = event_data.get('device_count', None)
177
178  # pylint: disable=unused-argument
179  def _run_ended(self, event_data):
180    # Renew ResultReport if is module level(reporter.silent=False)
181    if not self.reporter.silent:
182      self.reporter.set_current_iteration_summary(self.run_num)
183      self.reporter = result_reporter.ResultReporter(silent=False)
184
185  def _module_ended(self, event_data):
186    pass
187
188  def _test_ended(self, event_data):
189    name = TEST_NAME_TEMPLATE % (
190        event_data['className'],
191        event_data['testName'],
192    )
193    test_time = ''
194    if self.state['test_start_time']:
195      test_time = self._calc_duration(
196          event_data['end_time'] - self.state['test_start_time']
197      )
198    if self.state['last_failed'] and name == self.state['last_failed']['name']:
199      status = test_runner_base.FAILED_STATUS
200      trace = self.state['last_failed']['trace']
201      self.state['last_failed'] = None
202    elif (
203        self.state['last_assumption_failed']
204        and name == self.state['last_assumption_failed']
205    ):
206      status = test_runner_base.ASSUMPTION_FAILED
207      self.state['last_assumption_failed'] = None
208      trace = None
209    elif self.state['last_ignored'] and name == self.state['last_ignored']:
210      status = test_runner_base.IGNORED_STATUS
211      self.state['last_ignored'] = None
212      trace = None
213    else:
214      status = test_runner_base.PASSED_STATUS
215      trace = None
216
217    default_event_keys = ['className', 'end_time', 'testName']
218    additional_info = {}
219    for event_key in event_data.keys():
220      if event_key not in default_event_keys:
221        additional_info[event_key] = event_data.get(event_key, None)
222
223    self.reporter.process_test_result(
224        test_runner_base.TestResult(
225            runner_name=self.runner_name,
226            group_name=self.state['current_group'],
227            test_name=name,
228            status=status,
229            details=trace,
230            test_count=self.state['test_count'],
231            test_time=test_time,
232            runner_total=None,
233            additional_info=additional_info,
234            group_total=self.state['current_group_total'],
235            test_run_name=self.state['test_run_name'],
236        )
237    )
238
239  def _log_association(self, event_data):
240    event_data.setdefault('time', time.time())
241    self.log_associations.append(event_data)
242
243  switch_handler = {
244      EVENT_NAMES['module_started']: _module_started,
245      EVENT_NAMES['run_started']: _run_started,
246      EVENT_NAMES['test_started']: _test_started,
247      EVENT_NAMES['test_failed']: _test_failed,
248      EVENT_NAMES['test_ignored']: _test_ignored,
249      EVENT_NAMES['test_assumption_failure']: _test_assumption_failure,
250      EVENT_NAMES['run_failed']: _run_failed,
251      EVENT_NAMES['invocation_failed']: _invocation_failed,
252      EVENT_NAMES['invocation_ended']: _invocation_ended,
253      EVENT_NAMES['test_ended']: _test_ended,
254      EVENT_NAMES['run_ended']: _run_ended,
255      EVENT_NAMES['module_ended']: _module_ended,
256      EVENT_NAMES['log_association']: _log_association,
257  }
258
259  def process_event(self, event_name, event_data):
260    """Process the events of the test run and call reporter with results.
261
262    Args:
263        event_name: A string of the event name.
264        event_data: A dict of event data.
265    """
266    logging.debug('Processing %s %s', event_name, event_data)
267    if event_name in START_EVENTS:
268      self.event_stack.append(event_name)
269    elif event_name in END_EVENTS:
270      self._check_events_are_balanced(event_name, self.reporter, event_data)
271    if event_name in self.switch_handler:
272      self.switch_handler[event_name](self, event_data)
273    else:
274      # TODO(b/128875503): Implement the mechanism to inform not handled
275      # TF event.
276      logging.debug('Event[%s] is not processable.', event_name)
277
278  def _check_events_are_balanced(
279      self, end_event_name, reporter, end_event_data
280  ):
281    """Check whether the Start events and End events are balanced.
282
283    When imbalance events are detected, and we understand the case of imbalance,
284    the events will handled without throwing error; otherwise EventHandleError
285    will raise.
286
287    Args:
288        end_event_name: A string of the event name.
289        reporter: A ResultReporter instance.
290        end_event_data: A dict of event data.
291
292    Raises:
293        EventHandleError if we can't handle the imbalance of START/END events.
294    """
295    start_event = self.event_stack.pop() if self.event_stack else None
296
297    def _handle_crashed_test(message):
298      atest_utils.print_and_log_error(message)
299
300      self.reporter.process_test_result(
301          test_runner_base.TestResult(
302              runner_name=self.runner_name,
303              group_name=self.state['current_group'],
304              test_name=self.state['current_test'],
305              status=test_runner_base.ERROR_STATUS,
306              details=message,
307              test_count=self.state['test_count'],
308              test_time='',
309              runner_total=None,
310              group_total=self.state['current_group_total'],
311              additional_info={},
312              test_run_name=self.state['test_run_name'],
313          )
314      )
315
316    if not start_event or EVENT_PAIRS[start_event] != end_event_name:
317      # Here bubble up the failed trace in the situation having
318      # TEST_FAILED but never receiving TEST_ENDED.
319      if self.state['last_failed'] and (
320          start_event == EVENT_NAMES['test_started']
321      ):
322        self.reporter.process_test_result(
323            test_runner_base.TestResult(
324                runner_name=self.runner_name,
325                group_name=self.state['current_group'],
326                test_name=self.state['last_failed']['name'],
327                status=test_runner_base.FAILED_STATUS,
328                details=self.state['last_failed']['trace'],
329                test_count=self.state['test_count'],
330                test_time='',
331                runner_total=None,
332                group_total=self.state['current_group_total'],
333                additional_info={},
334                test_run_name=self.state['test_run_name'],
335            )
336        )
337        # Even though we have proceessed the test result here, we still consider
338        # this case unhandled as we don't have a full understanding about the cause.
339        # So we don't return here.
340        raise EventHandleError(
341            'Error: Test failed without receiving a test end event'
342        )
343      elif (
344          end_event_name == EVENT_NAMES['module_ended']
345          and start_event == EVENT_NAMES['run_started']
346      ):
347        _handle_crashed_test(
348            'Test run started but did not end. This often happens when the test'
349            ' binary/app such as android instrumentation app process died.'
350            ' Test count might be inaccurate.'
351        )
352        return
353      elif (
354          end_event_name == EVENT_NAMES['run_ended']
355          and start_event == EVENT_NAMES['test_started']
356      ):
357        _handle_crashed_test(
358            'Test started but did not end. This often happens when the test'
359            ' binary/app such as android instrumentation app process died.'
360            ' Test count might be inaccurate.'
361        )
362        return
363      else:
364        raise EventHandleError(
365            'Error: Saw %s Start event and %s End event. These should be equal!'
366            % (start_event, end_event_name)
367        )
368
369  @staticmethod
370  def _calc_duration(duration):
371    """Convert duration from ms to 3h2m43.034s.
372
373    Args:
374        duration: millisecond
375
376    Returns:
377        string in h:m:s, m:s, s or millis, depends on the duration.
378    """
379    delta = timedelta(milliseconds=duration)
380    timestamp = str(delta).split(':')  # hh:mm:microsec
381
382    if duration < ONE_SECOND:
383      return '({}ms)'.format(duration)
384    if duration < ONE_MINUTE:
385      return '({:.3f}s)'.format(float(timestamp[2]))
386    if duration < ONE_HOUR:
387      return '({0}m{1:.3f}s)'.format(timestamp[1], float(timestamp[2]))
388    return '({0}h{1}m{2:.3f}s)'.format(
389        timestamp[0], timestamp[1], float(timestamp[2])
390    )
391