xref: /aosp_15_r20/tools/asuite/atest/atest_main.py (revision c2e18aaa1096c836b086f94603d04f4eb9cf37f5)
1#!/usr/bin/env python3
2#
3# Copyright 2017, The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""Command line utility for running Android tests through TradeFederation.
18
19atest helps automate the flow of building test modules across the Android
20code base and executing the tests via the TradeFederation test harness.
21
22atest is designed to support any test types that can be ran by TradeFederation.
23"""
24
25# pylint: disable=too-many-lines
26
27from __future__ import annotations
28from __future__ import print_function
29
30import abc
31import argparse
32import collections
33import dataclasses
34import functools
35import itertools
36import logging
37import os
38import platform
39import subprocess
40import sys
41import tempfile
42import threading
43import time
44from typing import Any, Dict, List, Set
45
46from atest import arg_parser
47from atest import atest_configs
48from atest import atest_execution_info
49from atest import atest_utils
50from atest import banner
51from atest import bazel_mode
52from atest import bug_detector
53from atest import cli_translator
54from atest import constants
55from atest import device_update
56from atest import module_info
57from atest import result_reporter
58from atest import test_runner_handler
59from atest.atest_enum import DetectType
60from atest.atest_enum import ExitCode
61from atest.coverage import coverage
62from atest.metrics import metrics
63from atest.metrics import metrics_base
64from atest.metrics import metrics_utils
65from atest.test_finders import test_finder_utils
66from atest.test_finders import test_info
67from atest.test_finders.test_info import TestInfo
68from atest.test_runner_invocation import TestRunnerInvocation
69from atest.tools import indexing
70from atest.tools import start_avd as avd
71
72EXPECTED_VARS = frozenset([
73    constants.ANDROID_BUILD_TOP,
74    'ANDROID_TARGET_OUT_TESTCASES',
75    constants.ANDROID_OUT,
76])
77TEST_RUN_DIR_PREFIX = '%Y%m%d_%H%M%S'
78CUSTOM_ARG_FLAG = '--'
79OPTION_NOT_FOR_TEST_MAPPING = (
80    'Option "{}" does not work for running tests in TEST_MAPPING files'
81)
82
83DEVICE_TESTS = 'tests that require device'
84HOST_TESTS = 'tests that do NOT require device'
85RESULT_HEADER_FMT = '\nResults from %(test_type)s:'
86RUN_HEADER_FMT = '\nRunning %(test_count)d %(test_type)s.'
87TEST_COUNT = 'test_count'
88TEST_TYPE = 'test_type'
89END_OF_OPTION = '--'
90HAS_IGNORED_ARGS = False
91# Conditions that atest should exit without sending result to metrics.
92EXIT_CODES_BEFORE_TEST = [
93    ExitCode.ENV_NOT_SETUP,
94    ExitCode.TEST_NOT_FOUND,
95    ExitCode.OUTSIDE_ROOT,
96    ExitCode.AVD_CREATE_FAILURE,
97    ExitCode.AVD_INVALID_ARGS,
98]
99
100# Stdout print prefix for results directory. May be used in integration tests.
101_RESULTS_DIR_PRINT_PREFIX = 'Atest results and logs directory: '
102# Log prefix for dry-run run command. May be used in integration tests.
103_DRY_RUN_COMMAND_LOG_PREFIX = 'Internal run command from dry-run: '
104
105
106@dataclasses.dataclass
107class Steps:
108  """A dataclass that stores enabled steps."""
109
110  build: bool
111  install: bool
112  test: bool
113
114
115def parse_steps(args: arg_parser.AtestArgParser) -> Steps:
116  """Return Steps object.
117
118  Args:
119      args: an AtestArgParser object.
120
121  Returns:
122      Step object that stores the boolean of build, install and test.
123  """
124  # Implicitly running 'build', 'install' and 'test' when args.steps is None.
125  if not args.steps:
126    return Steps(True, True, True)
127  build = constants.BUILD_STEP in args.steps
128  test = constants.TEST_STEP in args.steps
129  install = constants.INSTALL_STEP in args.steps
130  if install and not test:
131    atest_utils.print_and_log_warning(
132        'Installing without test step is currently not '
133        'supported; Atest will proceed testing!'
134    )
135    test = True
136  return Steps(build, install, test)
137
138
139def _get_args_from_config():
140  """Get customized atest arguments in the config file.
141
142  If the config has not existed yet, atest will initialize an example
143  config file for it without any effective options.
144
145  Returns:
146      A list read from the config file.
147  """
148  _config = atest_utils.get_config_folder().joinpath('config')
149  if not _config.parent.is_dir():
150    _config.parent.mkdir(parents=True)
151  args = []
152  if not _config.is_file():
153    with open(_config, 'w+', encoding='utf8') as cache:
154      cache.write(constants.ATEST_EXAMPLE_ARGS)
155    return args
156  warning = 'Line {} contains {} and will be ignored.'
157  print(
158      '\n{} {}'.format(
159          atest_utils.mark_cyan('Reading config:'),
160          _config,
161      )
162  )
163  # pylint: disable=global-statement:
164  global HAS_IGNORED_ARGS
165  with open(_config, 'r', encoding='utf8') as cache:
166    for entry in cache.readlines():
167      # Strip comments.
168      arg_in_line = entry.partition('#')[0].strip()
169      # Strip test name/path.
170      if arg_in_line.startswith('-'):
171        # Process argument that contains whitespaces.
172        # e.g. ["--serial foo"] -> ["--serial", "foo"]
173        if len(arg_in_line.split()) > 1:
174          # remove "--" to avoid messing up atest/tradefed commands.
175          if END_OF_OPTION in arg_in_line.split():
176            HAS_IGNORED_ARGS = True
177            print(
178                warning.format(
179                    atest_utils.mark_yellow(arg_in_line), END_OF_OPTION
180                )
181            )
182          args.extend(arg_in_line.split())
183        else:
184          if END_OF_OPTION == arg_in_line:
185            HAS_IGNORED_ARGS = True
186            print(
187                warning.format(
188                    atest_utils.mark_yellow(arg_in_line), END_OF_OPTION
189                )
190            )
191          args.append(arg_in_line)
192  return args
193
194
195def _parse_args(argv: List[str]) -> argparse.Namespace:
196  """Parse command line arguments.
197
198  Args:
199      argv: A list of arguments.
200
201  Returns:
202      A Namespace holding parsed args
203  """
204  # Store everything after '--' in custom_args.
205  pruned_argv = argv
206  custom_args_index = None
207  if CUSTOM_ARG_FLAG in argv:
208    custom_args_index = argv.index(CUSTOM_ARG_FLAG)
209    pruned_argv = argv[:custom_args_index]
210  args = arg_parser.create_atest_arg_parser().parse_args(pruned_argv)
211  args.custom_args = []
212  if custom_args_index is not None:
213    for arg in argv[custom_args_index + 1 :]:
214      logging.debug('Quoting regex argument %s', arg)
215      args.custom_args.append(atest_utils.quote(arg))
216
217  return args
218
219
220def _configure_logging(verbose: bool, results_dir: str):
221  """Configure the logger.
222
223  Args:
224      verbose: If true display DEBUG level logs on console.
225      results_dir: A directory which stores the ATest execution information.
226  """
227  log_fmat = '%(asctime)s %(filename)s:%(lineno)s:%(levelname)s: %(message)s'
228  date_fmt = '%Y-%m-%d %H:%M:%S'
229  log_path = os.path.join(results_dir, 'atest.log')
230
231  logger = logging.getLogger('')
232  # Clear the handlers to prevent logging.basicConfig from being called twice.
233  logger.handlers = []
234
235  logging.basicConfig(
236      filename=log_path, level=logging.DEBUG, format=log_fmat, datefmt=date_fmt
237  )
238
239  class _StreamToLogger:
240    """A file like class to that redirect writes to a printer and logger."""
241
242    def __init__(self, logger, log_level, printer):
243      self._logger = logger
244      self._log_level = log_level
245      self._printer = printer
246      self._buffers = []
247
248    def write(self, buf: str) -> None:
249      self._printer.write(buf)
250
251      if len(buf) == 1 and buf[0] == '\n' and self._buffers:
252        self._logger.log(self._log_level, ''.join(self._buffers))
253        self._buffers.clear()
254      else:
255        self._buffers.append(buf)
256
257    def flush(self) -> None:
258      self._printer.flush()
259
260  stdout_log_level = 25
261  stderr_log_level = 45
262  logging.addLevelName(stdout_log_level, 'STDOUT')
263  logging.addLevelName(stderr_log_level, 'STDERR')
264  sys.stdout = _StreamToLogger(logger, stdout_log_level, sys.stdout)
265  sys.stderr = _StreamToLogger(logger, stderr_log_level, sys.stderr)
266
267
268def _missing_environment_variables():
269  """Verify the local environment has been set up to run atest.
270
271  Returns:
272      List of strings of any missing environment variables.
273  """
274  missing = list(
275      filter(None, [x for x in EXPECTED_VARS if not os.environ.get(x)])
276  )
277  if missing:
278    atest_utils.print_and_log_error(
279        "Local environment doesn't appear to have been "
280        'initialized. Did you remember to run lunch? Expected '
281        'Environment Variables: %s.',
282        missing,
283    )
284  return missing
285
286
287def make_test_run_dir() -> str:
288  """Make the test run dir in ATEST_RESULT_ROOT.
289
290  Returns:
291      A string of the dir path.
292  """
293  if not os.path.exists(constants.ATEST_RESULT_ROOT):
294    os.makedirs(constants.ATEST_RESULT_ROOT)
295  ctime = time.strftime(TEST_RUN_DIR_PREFIX, time.localtime())
296  test_result_dir = tempfile.mkdtemp(
297      prefix='%s_' % ctime, dir=constants.ATEST_RESULT_ROOT
298  )
299  print(_RESULTS_DIR_PRINT_PREFIX + test_result_dir)
300  return test_result_dir
301
302
303def get_extra_args(args):
304  """Get extra args for test runners.
305
306  Args:
307      args: arg parsed object.
308
309  Returns:
310      Dict of extra args for test runners to utilize.
311  """
312  extra_args = {}
313  if args.wait_for_debugger:
314    extra_args[constants.WAIT_FOR_DEBUGGER] = None
315  if not parse_steps(args).install:
316    extra_args[constants.DISABLE_INSTALL] = None
317  # The key and its value of the dict can be called via:
318  # if args.aaaa:
319  #     extra_args[constants.AAAA] = args.aaaa
320  arg_maps = {
321      'all_abi': constants.ALL_ABI,
322      'annotation_filter': constants.ANNOTATION_FILTER,
323      'bazel_arg': constants.BAZEL_ARG,
324      'collect_tests_only': constants.COLLECT_TESTS_ONLY,
325      'experimental_coverage': constants.COVERAGE,
326      'custom_args': constants.CUSTOM_ARGS,
327      'device_only': constants.DEVICE_ONLY,
328      'disable_teardown': constants.DISABLE_TEARDOWN,
329      'disable_upload_result': constants.DISABLE_UPLOAD_RESULT,
330      'dry_run': constants.DRY_RUN,
331      'host': constants.HOST,
332      'instant': constants.INSTANT,
333      'iterations': constants.ITERATIONS,
334      'request_upload_result': constants.REQUEST_UPLOAD_RESULT,
335      'bazel_mode_features': constants.BAZEL_MODE_FEATURES,
336      'rerun_until_failure': constants.RERUN_UNTIL_FAILURE,
337      'retry_any_failure': constants.RETRY_ANY_FAILURE,
338      'serial': constants.SERIAL,
339      'sharding': constants.SHARDING,
340      'test_filter': constants.TEST_FILTER,
341      'test_timeout': constants.TEST_TIMEOUT,
342      'tf_debug': constants.TF_DEBUG,
343      'tf_template': constants.TF_TEMPLATE,
344      'user_type': constants.USER_TYPE,
345      'verbose': constants.VERBOSE,
346      'use_tf_min_base_template': constants.USE_TF_MIN_BASE_TEMPLATE,
347  }
348  not_match = [k for k in arg_maps if k not in vars(args)]
349  if not_match:
350    raise AttributeError(
351        '%s object has no attribute %s' % (type(args).__name__, not_match)
352    )
353  extra_args.update({
354      arg_maps.get(k): v for k, v in vars(args).items() if arg_maps.get(k) and v
355  })
356  return extra_args
357
358
359def _validate_exec_mode(args, test_infos: list[TestInfo], host_tests=None):
360  """Validate all test execution modes are not in conflict.
361
362  Exit the program with INVALID_EXEC_MODE code if the desired is a host-side
363  test but the given is a device-side test.
364
365  If the given is a host-side test and not specified `args.host`, forcibly
366  set `args.host` to True.
367
368  Args:
369      args: parsed args object.
370      test_infos: a list of TestInfo objects.
371      host_tests: True if all tests should be deviceless, False if all tests
372        should be device tests. Default is set to None, which means tests can be
373        either deviceless or device tests.
374  """
375  all_device_modes = {x.get_supported_exec_mode() for x in test_infos}
376  err_msg = None
377  # In the case of '$atest <device-only> --host', exit.
378  if (host_tests or args.host) and constants.DEVICE_TEST in all_device_modes:
379    device_only_tests = [
380        x.test_name
381        for x in test_infos
382        if x.get_supported_exec_mode() == constants.DEVICE_TEST
383    ]
384    err_msg = (
385        'Specified --host, but the following tests are device-only:\n  '
386        + '\n  '.join(sorted(device_only_tests))
387        + '\nPlease remove the  option when running device-only tests.'
388    )
389  # In the case of '$atest <host-only> <device-only> --host' or
390  # '$atest <host-only> <device-only>', exit.
391  if (
392      constants.DEVICELESS_TEST in all_device_modes
393      and constants.DEVICE_TEST in all_device_modes
394  ):
395    err_msg = 'There are host-only and device-only tests in command.'
396  if host_tests is False and constants.DEVICELESS_TEST in all_device_modes:
397    err_msg = 'There are host-only tests in command.'
398  if err_msg:
399    atest_utils.print_and_log_error(err_msg)
400    metrics_utils.send_exit_event(ExitCode.INVALID_EXEC_MODE, logs=err_msg)
401    sys.exit(ExitCode.INVALID_EXEC_MODE)
402  # The 'adb' may not be available for the first repo sync or a clean build;
403  # run `adb devices` in the build step again.
404  if atest_utils.has_command('adb'):
405    _validate_adb_devices(args, test_infos)
406  # In the case of '$atest <host-only>', we add --host to run on host-side.
407  # The option should only be overridden if `host_tests` is not set.
408  if not args.host and host_tests is None:
409    logging.debug('Appending "--host" for a deviceless test...')
410    args.host = bool(constants.DEVICELESS_TEST in all_device_modes)
411
412
413def _validate_adb_devices(args, test_infos):
414  """Validate the availability of connected devices via adb command.
415
416  Exit the program with error code if have device-only and host-only.
417
418  Args:
419      args: parsed args object.
420      test_infos: TestInfo object.
421  """
422  # No need to check device availability if the user does not acquire to test.
423  if not parse_steps(args).test:
424    return
425  if args.no_checking_device:
426    return
427  # No need to check local device availability if the device test is running
428  # remotely.
429  if args.bazel_mode_features and (
430      bazel_mode.Features.EXPERIMENTAL_REMOTE_AVD in args.bazel_mode_features
431  ):
432    return
433  all_device_modes = {x.get_supported_exec_mode() for x in test_infos}
434  device_tests = [
435      x.test_name
436      for x in test_infos
437      if x.get_supported_exec_mode() != constants.DEVICELESS_TEST
438  ]
439  # Only block testing if it is a device test.
440  if constants.DEVICE_TEST in all_device_modes:
441    if (
442        not any((args.host, args.start_avd, args.acloud_create))
443        and not atest_utils.get_adb_devices()
444    ):
445      err_msg = (
446          f'Stop running test(s): {", ".join(device_tests)} require a device.'
447      )
448      atest_utils.colorful_print(err_msg, constants.RED)
449      logging.debug(atest_utils.mark_red(constants.REQUIRE_DEVICES_MSG))
450      metrics_utils.send_exit_event(ExitCode.DEVICE_NOT_FOUND, logs=err_msg)
451      sys.exit(ExitCode.DEVICE_NOT_FOUND)
452
453
454def _validate_tm_tests_exec_mode(
455    args: argparse.Namespace,
456    device_test_infos: List[test_info.TestInfo],
457    host_test_infos: List[test_info.TestInfo],
458):
459  """Validate all test execution modes are not in conflict.
460
461  Validate the tests' platform variant setting. For device tests, exit the
462  program if any test is found for host-only. For host tests, exit the
463  program if any test is found for device-only.
464
465  Args:
466      args: parsed args object.
467      device_test_infos: TestInfo instances for device tests.
468      host_test_infos: TestInfo instances for host tests.
469  """
470
471  # No need to verify device tests if atest command is set to only run host
472  # tests.
473  if device_test_infos and not args.host:
474    _validate_exec_mode(args, device_test_infos, host_tests=False)
475  if host_test_infos:
476    _validate_exec_mode(args, host_test_infos, host_tests=True)
477
478
479def _has_valid_test_mapping_args(args):
480  """Validate test mapping args.
481
482  Not all args work when running tests in TEST_MAPPING files. Validate the
483  args before running the tests.
484
485  Args:
486      args: parsed args object.
487
488  Returns:
489      True if args are valid
490  """
491  is_test_mapping = atest_utils.is_test_mapping(args)
492  if is_test_mapping:
493    metrics.LocalDetectEvent(detect_type=DetectType.IS_TEST_MAPPING, result=1)
494  else:
495    metrics.LocalDetectEvent(detect_type=DetectType.IS_TEST_MAPPING, result=0)
496  if not is_test_mapping:
497    return True
498  options_to_validate = [
499      (args.annotation_filter, '--annotation-filter'),
500  ]
501  for arg_value, arg in options_to_validate:
502    if arg_value:
503      atest_utils.print_and_log_error(
504          atest_utils.mark_red(OPTION_NOT_FOR_TEST_MAPPING.format(arg))
505      )
506      return False
507  return True
508
509
510def _print_deprecation_warning(arg_to_deprecate: str):
511  """For features that are up for deprecation in the near future, print a message
512
513  to alert the user about the upcoming deprecation.
514
515  Args:
516      arg_to_deprecate: the arg with which the to-be-deprecated feature is
517        called.
518  """
519  args_to_deprecation_info = {
520      # arg_to_deprecate : (deprecation timeframe, additional info for users)
521      '--info': ('is deprecated.', '\nUse CodeSearch or `gomod` instead.')
522  }
523
524  warning_message = (
525      f'\nWARNING: The `{arg_to_deprecate}` feature '
526      + ' '.join(args_to_deprecation_info[arg_to_deprecate])
527      + '\nPlease file a bug or feature request to the Atest team if you have'
528      ' any concerns.'
529  )
530  atest_utils.colorful_print(warning_message, constants.RED)
531
532
533def is_from_test_mapping(test_infos):
534  """Check that the test_infos came from TEST_MAPPING files.
535
536  Args:
537      test_infos: A set of TestInfos.
538
539  Returns:
540      True if the test infos are from TEST_MAPPING files.
541  """
542  return list(test_infos)[0].from_test_mapping
543
544
545def _split_test_mapping_tests(test_infos):
546  """Split Test Mapping tests into 2 groups: device tests and host tests.
547
548  Args:
549      test_infos: A set of TestInfos.
550
551  Returns:
552      A tuple of (device_test_infos, host_test_infos), where
553      device_test_infos: A set of TestInfos for tests that require device.
554      host_test_infos: A set of TestInfos for tests that do NOT require
555          device.
556  """
557  assert is_from_test_mapping(test_infos)
558  host_test_infos = {info for info in test_infos if info.host}
559  device_test_infos = {info for info in test_infos if not info.host}
560  return device_test_infos, host_test_infos
561
562
563def _exclude_modules_in_targets(build_targets):
564  """Method that excludes MODULES-IN-* targets.
565
566  Args:
567      build_targets: A set of build targets.
568
569  Returns:
570      A set of build targets that excludes MODULES-IN-*.
571  """
572  shrank_build_targets = build_targets.copy()
573  logging.debug(
574      'Will exclude all "%s*" from the build targets.', constants.MODULES_IN
575  )
576  for target in build_targets:
577    if target.startswith(constants.MODULES_IN):
578      logging.debug('Ignore %s.', target)
579      shrank_build_targets.remove(target)
580  return shrank_build_targets
581
582
583def get_device_count_config(test_infos, mod_info):
584  """Get the amount of desired devices from the test config.
585
586  Args:
587      test_infos: A set of TestInfo instances.
588      mod_info: ModuleInfo object.
589
590  Returns: the count of devices in test config. If there are more than one
591           configs, return the maximum.
592  """
593  max_count = 0
594  for tinfo in test_infos:
595    test_config, _ = test_finder_utils.get_test_config_and_srcs(tinfo, mod_info)
596    if test_config:
597      devices = atest_utils.get_config_device(test_config)
598      if devices:
599        max_count = max(len(devices), max_count)
600  return max_count
601
602
603def has_set_sufficient_devices(
604    required_amount: int, serial: List[str] = None
605) -> bool:
606  """Detect whether sufficient device serial is set for test."""
607  given_amount = len(serial) if serial else 0
608  # Only check when both given_amount and required_amount are non zero.
609  if all((given_amount, required_amount)):
610    # Base on TF rules, given_amount can be greater than or equal to
611    # required_amount.
612    if required_amount > given_amount:
613      atest_utils.colorful_print(
614          f'The test requires {required_amount} devices, '
615          f'but {given_amount} were given.',
616          constants.RED,
617      )
618      return False
619  return True
620
621
622def setup_metrics_tool_name(no_metrics: bool = False):
623  """Setup tool_name and sub_tool_name for MetricsBase."""
624  if (
625      not no_metrics
626      and metrics_base.MetricsBase.user_type == metrics_base.INTERNAL_USER
627  ):
628    metrics_utils.print_data_collection_notice()
629
630    USER_FROM_TOOL = os.getenv(constants.USER_FROM_TOOL)
631    metrics_base.MetricsBase.tool_name = (
632        USER_FROM_TOOL if USER_FROM_TOOL else constants.TOOL_NAME
633    )
634
635    USER_FROM_SUB_TOOL = os.getenv(constants.USER_FROM_SUB_TOOL)
636    metrics_base.MetricsBase.sub_tool_name = (
637        USER_FROM_SUB_TOOL if USER_FROM_SUB_TOOL else constants.SUB_TOOL_NAME
638    )
639
640
641class _AtestMain:
642  """Entry point of atest script."""
643
644  def __init__(
645      self,
646      argv: list[str],
647  ):
648    """Initializes the _AtestMain object.
649
650    Args:
651        argv: A list of command line arguments.
652    """
653    self._argv: list[str] = argv
654
655    self._banner_printer: banner.BannerPrinter = None
656    self._steps: Steps = None
657    self._results_dir: str = None
658    self._mod_info: module_info.ModuleInfo = None
659    self._test_infos: list[test_info.TestInfo] = None
660    self._test_execution_plan: _TestExecutionPlan = None
661
662    self._acloud_proc: subprocess.Popen = None
663    self._acloud_report_file: str = None
664    self._test_info_loading_duration: float = 0
665    self._build_duration: float = 0
666    self._module_info_rebuild_required: bool = False
667    self._is_out_clean_before_module_info_build: bool = False
668    self._invocation_begin_time: float = None
669
670  def run(self):
671    self._results_dir = make_test_run_dir()
672
673    if END_OF_OPTION in self._argv:
674      end_position = self._argv.index(END_OF_OPTION)
675      final_args = [
676          *self._argv[1:end_position],
677          *_get_args_from_config(),
678          *self._argv[end_position:],
679      ]
680    else:
681      final_args = [*self._argv[1:], *_get_args_from_config()]
682    if final_args != self._argv[1:]:
683      print(
684          'The actual cmd will be: \n\t{}\n'.format(
685              atest_utils.mark_cyan('atest ' + ' '.join(final_args))
686          )
687      )
688      metrics.LocalDetectEvent(detect_type=DetectType.ATEST_CONFIG, result=1)
689      if HAS_IGNORED_ARGS:
690        atest_utils.colorful_print(
691            'Please correct the config and try again.', constants.YELLOW
692        )
693        sys.exit(ExitCode.EXIT_BEFORE_MAIN)
694    else:
695      metrics.LocalDetectEvent(detect_type=DetectType.ATEST_CONFIG, result=0)
696
697    self._args = _parse_args(final_args)
698    atest_configs.GLOBAL_ARGS = self._args
699    _configure_logging(self._args.verbose, self._results_dir)
700
701    logging.debug(
702        'Start of atest run. sys.argv: %s, final_args: %s',
703        self._argv,
704        final_args,
705    )
706
707    self._steps = parse_steps(self._args)
708
709    self._banner_printer = banner.BannerPrinter.create()
710
711    with atest_execution_info.AtestExecutionInfo(
712        final_args, self._results_dir, atest_configs.GLOBAL_ARGS
713    ):
714      setup_metrics_tool_name(atest_configs.GLOBAL_ARGS.no_metrics)
715
716      logging.debug(
717          'Creating atest script with argv: %s\n  results_dir: %s\n  args: %s\n'
718          '  run id: %s',
719          self._argv,
720          self._results_dir,
721          self._args,
722          metrics.get_run_id(),
723      )
724      exit_code = self._run_all_steps()
725      detector = bug_detector.BugDetector(final_args, exit_code)
726      if exit_code not in EXIT_CODES_BEFORE_TEST:
727        metrics.LocalDetectEvent(
728            detect_type=DetectType.BUG_DETECTED, result=detector.caught_result
729        )
730
731    self._banner_printer.print()
732
733    sys.exit(exit_code)
734
735  def _check_no_action_argument(self) -> int:
736    """Method for non-action arguments such as --version, --history, --latest_result, etc.
737
738    Returns:
739        Exit code if no action. None otherwise.
740    """
741    if self._args.version:
742      print(atest_utils.get_atest_version())
743      return ExitCode.SUCCESS
744    if self._args.history:
745      atest_execution_info.print_test_result(
746          constants.ATEST_RESULT_ROOT, self._args.history
747      )
748      return ExitCode.SUCCESS
749    if self._args.latest_result:
750      atest_execution_info.print_test_result_by_path(
751          constants.LATEST_RESULT_FILE
752      )
753      return ExitCode.SUCCESS
754    return None
755
756  def _check_envs_and_args(self) -> int:
757    """Validate environment variables and args.
758
759    Returns:
760        Exit code if any setup or arg is invalid. None otherwise.
761    """
762    if (
763        not os.getenv(constants.ANDROID_BUILD_TOP, ' ') in os.getcwd()
764    ):  # Not under android root.
765      atest_utils.colorful_print(
766          '\nAtest must always work under ${}!'.format(
767              constants.ANDROID_BUILD_TOP
768          ),
769          constants.RED,
770      )
771      return ExitCode.OUTSIDE_ROOT
772    if _missing_environment_variables():
773      return ExitCode.ENV_NOT_SETUP
774    if not _has_valid_test_mapping_args(self._args):
775      return ExitCode.INVALID_TM_ARGS
776
777    # Checks whether ANDROID_SERIAL environment variable is set to an empty string.
778    if 'ANDROID_SERIAL' in os.environ and not os.environ['ANDROID_SERIAL']:
779      atest_utils.print_and_log_warning(
780          'Empty device serial detected in the ANDROID_SERIAL environment'
781          ' variable. This may causes unexpected behavior in TradeFed. If not'
782          ' targeting a specific device, consider unset the ANDROID_SERIAL'
783          ' environment variable. See b/330365573 for details.'
784      )
785
786    # Checks whether any empty serial strings exist in the argument array.
787    if self._args.serial and not all(self._args.serial):
788      atest_utils.print_and_log_warning(
789          'Empty device serial specified via command-line argument. This may'
790          ' cause unexpected behavior in TradeFed. If not targeting a specific'
791          ' device, consider remove the serial argument. See b/330365573 for'
792          ' details.'
793      )
794
795    return None
796
797  def _update_build_env(self):
798    """Updates build environment variables."""
799    # Sets coverage environment variables.
800    if self._args.experimental_coverage:
801      atest_utils.update_build_env(coverage.build_env_vars())
802
803    # Update environment variable dict accordingly to args.build_output
804    atest_utils.update_build_env({
805        'ANDROID_QUIET_BUILD': 'true',
806        'BUILD_OUTPUT_MODE': self._args.build_output.value,
807    })
808
809  def _start_acloud_if_requested(self) -> None:
810    if not self._args.acloud_create and not self._args.start_avd:
811      return
812    if not parse_steps(self._args).test:
813      print('acloud/avd is requested but ignored because no test is requested.')
814      return
815    print('Creating acloud/avd...')
816    self._acloud_proc, self._acloud_report_file = avd.acloud_create_validator(
817        self._results_dir, self._args
818    )
819
820  def _check_acloud_status(self) -> int:
821    """Checks acloud status if acloud is requested.
822
823    Returns:
824        acloud status code. None if no acloud requested.
825    """
826    if self._acloud_proc:
827      self._acloud_proc.join()
828      status = avd.probe_acloud_status(
829          self._acloud_report_file,
830          self._test_info_loading_duration + self._build_duration,
831      )
832      return status
833    return None
834
835  def _start_indexing_if_required(self) -> threading.Thread:
836    """Starts indexing if required.
837
838    Returns:
839        A thread that runs indexing. None if no indexing is required.
840    """
841    if not self._steps.build:
842      logging.debug("Skip indexing because there's no build required.")
843      return None
844
845    if indexing.Indices().has_all_indices():
846      no_indexing_args = (
847          self._args.dry_run,
848          self._args.list_modules,
849      )
850      if any(no_indexing_args):
851        logging.debug(
852            'Skip indexing for no_indexing_args=%s.', no_indexing_args
853        )
854        return None
855    else:
856      logging.debug(
857          'Indexing targets is required because some index files do not exist.'
858      )
859
860    logging.debug('Starting to index targets in a background thread.')
861    return atest_utils.start_threading(
862        indexing.index_targets,
863        daemon=True,
864    )
865
866  @functools.cache
867  def _get_device_update_method(self) -> device_update.AdeviceUpdateMethod:
868    """Creates a device update method."""
869    return device_update.AdeviceUpdateMethod(
870        targets=set(self._args.update_modules or [])
871    )
872
873  def _get_device_update_dependencies(self) -> set[str]:
874    """Gets device update dependencies.
875
876    Returns:
877        A set of dependencies for the device update method.
878    """
879    if not self._args.update_device:
880      return set()
881
882    if (
883        self._test_execution_plan
884        and not self._test_execution_plan.requires_device_update()
885    ):
886      return set()
887
888    return self._get_device_update_method().dependencies()
889
890  def _need_rebuild_module_info(self) -> bool:
891    """Method that tells whether we need to rebuild module-info.json or not.
892
893    Returns:
894        True for forcely/smartly rebuild, otherwise False without rebuilding.
895    """
896    # +-----------------+
897    # | Explicitly pass |  yes
898    # |    '--test'     +-------> False (won't rebuild)
899    # +--------+--------+
900    #          | no
901    #          V
902    # +-------------------------+
903    # | Explicitly pass         |  yes
904    # | '--rebuild-module-info' +-------> True (forcely rebuild)
905    # +--------+----------------+
906    #          | no
907    #          V
908    # +-------------------+
909    # |    Build files    |  no
910    # | integrity is good +-------> True (smartly rebuild)
911    # +--------+----------+
912    #          | yes
913    #          V
914    #        False (won't rebuild)
915    if not self._steps.build:
916      logging.debug('"--test" mode detected, will not rebuild module-info.')
917      return False
918    if self._args.rebuild_module_info:
919      return True
920    logging.debug('Examinating the consistency of build files...')
921    if not atest_utils.build_files_integrity_is_ok():
922      logging.debug('Found build files were changed.')
923      return True
924    return False
925
926  def _load_module_info(self):
927    self._is_out_clean_before_module_info_build = not os.path.exists(
928        os.environ.get(constants.ANDROID_PRODUCT_OUT, '')
929    )
930    self._module_info_rebuild_required = self._need_rebuild_module_info()
931    logging.debug(
932        'need_rebuild_module_info returned %s',
933        self._module_info_rebuild_required,
934    )
935
936    self._mod_info = module_info.load(
937        force_build=self._module_info_rebuild_required,
938        sqlite_module_cache=self._args.sqlite_module_cache,
939    )
940    logging.debug('Obtained module info object: %s', self._mod_info)
941
942  def _load_test_info_and_execution_plan(self) -> int | None:
943    """Loads test info and execution plan.
944
945    Returns:
946        Exit code if anything went wrong. None otherwise.
947    """
948    indexing_thread = self._start_indexing_if_required()
949
950    self._load_module_info()
951
952    translator = cli_translator.CLITranslator(
953        mod_info=self._mod_info,
954        print_cache_msg=not self._args.clear_cache,
955        bazel_mode_enabled=self._args.bazel_mode,
956        host=self._args.host,
957        bazel_mode_features=self._args.bazel_mode_features,
958        indexing_thread=indexing_thread,
959    )
960
961    find_start = time.time()
962    self._test_infos = translator.translate(self._args)
963
964    _AtestMain._inject_default_arguments_based_on_test_infos(
965        self._test_infos, self._args
966    )
967
968    # Only check for sufficient devices if not dry run.
969    self._args.device_count_config = get_device_count_config(
970        self._test_infos, self._mod_info
971    )
972    if not self._args.dry_run and not has_set_sufficient_devices(
973        self._args.device_count_config, self._args.serial
974    ):
975      return ExitCode.INSUFFICIENT_DEVICES
976
977    self._test_info_loading_duration = time.time() - find_start
978    if not self._test_infos:
979      return ExitCode.TEST_NOT_FOUND
980
981    self._test_execution_plan = _TestExecutionPlan.create(
982        args=self._args,
983        test_infos=self._test_infos,
984        results_dir=self._results_dir,
985        mod_info=self._mod_info,
986    )
987
988    return None
989
990  @staticmethod
991  def _inject_default_arguments_based_on_test_infos(
992      test_infos: list[test_info.TestInfo], args: argparse.Namespace
993  ) -> None:
994    if any(
995        'performance-tests' in info.compatibility_suites for info in test_infos
996    ):
997      if not args.disable_upload_result:
998        args.request_upload_result = True
999
1000  def _handle_list_modules(self) -> int:
1001    """Print the testable modules for a given suite.
1002
1003    Returns:
1004        Exit code.
1005    """
1006    self._load_module_info()
1007
1008    testable_modules = self._mod_info.get_testable_modules(
1009        self._args.list_modules
1010    )
1011    print(
1012        '\n%s'
1013        % atest_utils.mark_cyan(
1014            '%s Testable %s modules'
1015            % (len(testable_modules), self._args.list_modules)
1016        )
1017    )
1018    print(atest_utils.delimiter('-'))
1019    for module in sorted(testable_modules):
1020      print('\t%s' % module)
1021
1022    return ExitCode.SUCCESS
1023
1024  def _handle_dry_run(self) -> int:
1025    """Only print the commands of the target tests rather than running them.
1026
1027    Returns:
1028        Exit code.
1029    """
1030    error_code = self._load_test_info_and_execution_plan()
1031    if error_code is not None:
1032      return error_code
1033
1034    print(
1035        'Would build the following targets: %s'
1036        % (atest_utils.mark_green('%s' % self._get_build_targets()))
1037    )
1038
1039    all_run_cmds = []
1040    for test_runner, tests in test_runner_handler.group_tests_by_test_runners(
1041        self._test_infos
1042    ):
1043      runner = test_runner(
1044          self._results_dir,
1045          mod_info=self._mod_info,
1046          extra_args=self._test_execution_plan.extra_args,
1047      )
1048      run_cmds = runner.generate_run_commands(
1049          tests, self._test_execution_plan.extra_args
1050      )
1051      for run_cmd in run_cmds:
1052        all_run_cmds.append(run_cmd)
1053        logging.debug(_DRY_RUN_COMMAND_LOG_PREFIX + run_cmd)
1054        print(
1055            'Would run test via command: %s' % (atest_utils.mark_green(run_cmd))
1056        )
1057
1058    return ExitCode.SUCCESS
1059
1060  def _update_device_if_requested(self) -> None:
1061    """Runs the device update step."""
1062    if not self._args.update_device:
1063      if self._test_execution_plan.requires_device_update():
1064        self._banner_printer.register(
1065            'Tips: If your test requires device update, consider '
1066            'http://go/atest-single-command to simplify your workflow!'
1067        )
1068      return
1069    if not self._steps.test:
1070      print(
1071          'Device update requested but skipped due to running in build only'
1072          ' mode.'
1073      )
1074      return
1075
1076    if not self._test_execution_plan.requires_device_update():
1077      atest_utils.colorful_print(
1078          '\nWarning: Device update ignored because it is not required by '
1079          'tests in this invocation.',
1080          constants.YELLOW,
1081      )
1082      return
1083
1084    device_update_start = time.time()
1085    self._get_device_update_method().update(
1086        self._test_execution_plan.extra_args.get(constants.SERIAL, [])
1087    )
1088    device_update_duration = time.time() - device_update_start
1089    logging.debug('Updating device took %ss', device_update_duration)
1090    metrics.LocalDetectEvent(
1091        detect_type=DetectType.DEVICE_UPDATE_MS,
1092        result=int(round(device_update_duration * 1000)),
1093    )
1094
1095  def _get_build_targets(self) -> set[str]:
1096    """Gets the build targets."""
1097    build_targets = self._test_execution_plan.required_build_targets()
1098
1099    # Remove MODULE-IN-* from build targets by default.
1100    if not self._args.use_modules_in:
1101      build_targets = _exclude_modules_in_targets(build_targets)
1102
1103    if not build_targets:
1104      return None
1105
1106    if self._args.experimental_coverage:
1107      build_targets.update(coverage.build_modules())
1108
1109    # Add module-info.json target to the list of build targets to keep the
1110    # file up to date.
1111    build_targets.add(module_info.get_module_info_target())
1112
1113    build_targets |= self._get_device_update_dependencies()
1114    return build_targets
1115
1116  def _run_build_step(self) -> int:
1117    """Runs the build step.
1118
1119    Returns:
1120        Exit code if failed. None otherwise.
1121    """
1122    build_targets = self._get_build_targets()
1123
1124    # Add the -jx as a build target if user specify it.
1125    if self._args.build_j:
1126      build_targets.add(f'-j{self._args.build_j}')
1127
1128    build_start = time.time()
1129    success = atest_utils.build(build_targets)
1130    self._build_duration = time.time() - build_start
1131    metrics.BuildFinishEvent(
1132        duration=metrics_utils.convert_duration(self._build_duration),
1133        success=success,
1134        targets=build_targets,
1135    )
1136    metrics.LocalDetectEvent(
1137        detect_type=DetectType.BUILD_TIME_PER_TARGET,
1138        result=int(round(self._build_duration / len(build_targets))),
1139    )
1140    rebuild_module_info = DetectType.NOT_REBUILD_MODULE_INFO
1141    if self._is_out_clean_before_module_info_build:
1142      rebuild_module_info = DetectType.CLEAN_BUILD
1143    elif self._args.rebuild_module_info:
1144      rebuild_module_info = DetectType.REBUILD_MODULE_INFO
1145    elif self._module_info_rebuild_required:
1146      rebuild_module_info = DetectType.SMART_REBUILD_MODULE_INFO
1147    metrics.LocalDetectEvent(
1148        detect_type=rebuild_module_info, result=int(round(self._build_duration))
1149    )
1150    if not success:
1151      return ExitCode.BUILD_FAILURE
1152
1153  def _run_test_step(self) -> int:
1154    """Runs the test step.
1155
1156    Returns:
1157        Exit code.
1158    """
1159    # Stop calling Tradefed if the tests require a device.
1160    _validate_adb_devices(self._args, self._test_infos)
1161
1162    test_start = time.time()
1163    # Only send duration to metrics when no --build.
1164    if not self._steps.build:
1165      _init_and_find = time.time() - self._invocation_begin_time
1166      logging.debug('Initiation and finding tests took %ss', _init_and_find)
1167      metrics.LocalDetectEvent(
1168          detect_type=DetectType.INIT_AND_FIND_MS,
1169          result=int(round(_init_and_find * 1000)),
1170      )
1171
1172    tests_exit_code = self._test_execution_plan.execute()
1173
1174    if self._args.experimental_coverage:
1175      coverage.generate_coverage_report(
1176          self._results_dir,
1177          self._test_infos,
1178          self._mod_info,
1179          self._test_execution_plan.extra_args.get(constants.HOST, False),
1180          self._args.code_under_test,
1181      )
1182
1183    metrics.RunTestsFinishEvent(
1184        duration=metrics_utils.convert_duration(time.time() - test_start)
1185    )
1186    preparation_time = atest_execution_info.preparation_time(test_start)
1187    if preparation_time:
1188      # Send the preparation time only if it's set.
1189      metrics.RunnerFinishEvent(
1190          duration=metrics_utils.convert_duration(preparation_time),
1191          success=True,
1192          runner_name=constants.TF_PREPARATION,
1193          test=[],
1194      )
1195
1196    return tests_exit_code
1197
1198  def _send_start_event(self) -> None:
1199    metrics_utils.send_start_event(
1200        command_line=' '.join(self._argv),
1201        test_references=self._args.tests,
1202        cwd=os.getcwd(),
1203        operating_system=(
1204            f'{platform.platform()}:{platform.python_version()}/'
1205            f'{atest_utils.get_manifest_branch(True)}:'
1206            f'{atest_utils.get_atest_version()}'
1207        ),
1208        source_root=os.environ.get('ANDROID_BUILD_TOP', ''),
1209        hostname=platform.node(),
1210    )
1211
1212  def _disable_bazel_mode_if_unsupported(self) -> None:
1213    if (
1214        atest_utils.is_test_mapping(self._args)
1215        or self._args.experimental_coverage
1216    ):
1217      logging.debug('Running test mapping or coverage, disabling bazel mode.')
1218      atest_utils.colorful_print(
1219          'Not running using bazel-mode.', constants.YELLOW
1220      )
1221      self._args.bazel_mode = False
1222
1223  def _run_all_steps(self) -> int:
1224    """Executes the atest script.
1225
1226    Returns:
1227        Exit code.
1228    """
1229    self._invocation_begin_time = time.time()
1230
1231    self._update_build_env()
1232
1233    invalid_arg_exit_code = self._check_envs_and_args()
1234    if invalid_arg_exit_code is not None:
1235      sys.exit(invalid_arg_exit_code)
1236
1237    self._send_start_event()
1238
1239    no_action_exit_code = self._check_no_action_argument()
1240    if no_action_exit_code is not None:
1241      sys.exit(no_action_exit_code)
1242
1243    if self._args.list_modules:
1244      return self._handle_list_modules()
1245
1246    self._disable_bazel_mode_if_unsupported()
1247
1248    if self._args.dry_run:
1249      return self._handle_dry_run()
1250
1251    self._start_acloud_if_requested()
1252
1253    error_code = self._load_test_info_and_execution_plan()
1254    if error_code is not None:
1255      return error_code
1256
1257    if self._steps.build:
1258      error_code = self._run_build_step()
1259      if error_code is not None:
1260        return error_code
1261
1262    acloud_status = self._check_acloud_status()
1263    if acloud_status:
1264      return acloud_status
1265
1266    self._update_device_if_requested()
1267
1268    if self._steps.test and self._run_test_step() != ExitCode.SUCCESS:
1269      return ExitCode.TEST_FAILURE
1270
1271    return ExitCode.SUCCESS
1272
1273
1274class _TestExecutionPlan(abc.ABC):
1275  """Represents how an Atest invocation's tests will execute."""
1276
1277  @staticmethod
1278  def create(
1279      args: argparse.Namespace,
1280      test_infos: List[test_info.TestInfo],
1281      results_dir: str,
1282      mod_info: module_info.ModuleInfo,
1283  ) -> _TestExecutionPlan:
1284    """Creates a plan to execute the tests.
1285
1286    Args:
1287        args: An argparse.Namespace instance holding parsed args.
1288        test_infos: A list of instances of TestInfo.
1289        results_dir: A directory which stores the ATest execution information.
1290        mod_info: An instance of ModuleInfo.
1291
1292    Returns:
1293        An instance of _TestExecutionPlan.
1294    """
1295
1296    if is_from_test_mapping(test_infos):
1297      return _TestMappingExecutionPlan.create(
1298          args=args,
1299          test_infos=test_infos,
1300          results_dir=results_dir,
1301          mod_info=mod_info,
1302      )
1303
1304    return _TestModuleExecutionPlan.create(
1305        args=args,
1306        test_infos=test_infos,
1307        results_dir=results_dir,
1308        mod_info=mod_info,
1309    )
1310
1311  def __init__(
1312      self,
1313      args: argparse.Namespace,
1314      extra_args: Dict[str, Any],
1315      test_infos: List[test_info.TestInfo],
1316  ):
1317    self._args = args
1318    self._extra_args = extra_args
1319    self._test_infos = test_infos
1320
1321  @property
1322  def extra_args(self) -> Dict[str, Any]:
1323    return self._extra_args
1324
1325  @abc.abstractmethod
1326  def execute(self) -> ExitCode:
1327    """Executes all test runner invocations in this plan."""
1328
1329  @abc.abstractmethod
1330  def required_build_targets(self) -> Set[str]:
1331    """Returns the list of build targets required by this plan."""
1332
1333  @abc.abstractmethod
1334  def requires_device_update(self) -> bool:
1335    """Checks whether this plan requires device update."""
1336
1337
1338class _TestMappingExecutionPlan(_TestExecutionPlan):
1339  """A plan to execute Test Mapping tests."""
1340
1341  def __init__(
1342      self,
1343      args: argparse.Namespace,
1344      extra_args: Dict[str, Any],
1345      test_infos: List[test_info.TestInfo],
1346      test_type_to_invocations: Dict[str, List[TestRunnerInvocation]],
1347  ):
1348    super().__init__(args, extra_args, test_infos)
1349    self._test_type_to_invocations = test_type_to_invocations
1350
1351  @staticmethod
1352  def create(
1353      args: argparse.Namespace,
1354      test_infos: List[test_info.TestInfo],
1355      results_dir: str,
1356      mod_info: module_info.ModuleInfo,
1357  ) -> _TestMappingExecutionPlan:
1358    """Creates an instance of _TestMappingExecutionPlan.
1359
1360    Args:
1361        args: An argparse.Namespace instance holding parsed args.
1362        test_infos: A list of instances of TestInfo.
1363        results_dir: A directory which stores the ATest execution information.
1364        mod_info: An instance of ModuleInfo.
1365
1366    Returns:
1367        An instance of _TestMappingExecutionPlan.
1368    """
1369
1370    device_test_infos, host_test_infos = _split_test_mapping_tests(test_infos)
1371    _validate_tm_tests_exec_mode(args, device_test_infos, host_test_infos)
1372    extra_args = get_extra_args(args)
1373
1374    # TODO: change to another approach that put constants.CUSTOM_ARGS in the
1375    # end of command to make sure that customized args can override default
1376    # options.
1377    # For TEST_MAPPING, set timeout to 600000ms.
1378    custom_timeout = False
1379    for custom_args in args.custom_args:
1380      if '-timeout' in custom_args:
1381        custom_timeout = True
1382
1383    if args.test_timeout is None and not custom_timeout:
1384      extra_args.update({constants.TEST_TIMEOUT: 600000})
1385      logging.debug(
1386          'Set test timeout to %sms to align it in TEST_MAPPING.',
1387          extra_args.get(constants.TEST_TIMEOUT),
1388      )
1389
1390    def create_invocations(runner_extra_args, runner_test_infos):
1391      return test_runner_handler.create_test_runner_invocations(
1392          test_infos=runner_test_infos,
1393          results_dir=results_dir,
1394          mod_info=mod_info,
1395          extra_args=runner_extra_args,
1396          minimal_build=args.minimal_build,
1397      )
1398
1399    test_type_to_invocations = collections.OrderedDict()
1400    if extra_args.get(constants.DEVICE_ONLY):
1401      atest_utils.colorful_print(
1402          'Option `--device-only` specified. Skip running deviceless tests.',
1403          constants.MAGENTA,
1404      )
1405    else:
1406      # `host` option needs to be set to True to run host side tests.
1407      host_extra_args = extra_args.copy()
1408      host_extra_args[constants.HOST] = True
1409      test_type_to_invocations.setdefault(HOST_TESTS, []).extend(
1410          create_invocations(host_extra_args, host_test_infos)
1411      )
1412
1413    if extra_args.get(constants.HOST):
1414      atest_utils.colorful_print(
1415          'Option `--host` specified. Skip running device tests.',
1416          constants.MAGENTA,
1417      )
1418    else:
1419      test_type_to_invocations.setdefault(DEVICE_TESTS, []).extend(
1420          create_invocations(extra_args, device_test_infos)
1421      )
1422
1423    return _TestMappingExecutionPlan(
1424        args=args,
1425        extra_args=extra_args,
1426        test_infos=test_infos,
1427        test_type_to_invocations=test_type_to_invocations,
1428    )
1429
1430  def requires_device_update(self) -> bool:
1431    return any(
1432        inv.requires_device_update()
1433        for inv in itertools.chain.from_iterable(
1434            self._test_type_to_invocations.values()
1435        )
1436    )
1437
1438  def required_build_targets(self) -> Set[str]:
1439    build_targets = set()
1440    for invocation in itertools.chain.from_iterable(
1441        self._test_type_to_invocations.values()
1442    ):
1443      build_targets |= invocation.get_test_runner_reqs()
1444
1445    return build_targets
1446
1447  def execute(self) -> ExitCode:
1448    """Run all tests in TEST_MAPPING files.
1449
1450    Returns:
1451        Exit code.
1452    """
1453
1454    test_results = []
1455    for test_type, invocations in self._test_type_to_invocations.items():
1456      tests = list(
1457          itertools.chain.from_iterable(i.test_infos for i in invocations)
1458      )
1459      if not tests:
1460        continue
1461      header = RUN_HEADER_FMT % {TEST_COUNT: len(tests), TEST_TYPE: test_type}
1462      atest_utils.colorful_print(header, constants.MAGENTA)
1463      logging.debug('\n'.join([str(info) for info in tests]))
1464
1465      reporter = result_reporter.ResultReporter(
1466          collect_only=self._extra_args.get(constants.COLLECT_TESTS_ONLY),
1467          wait_for_debugger=atest_configs.GLOBAL_ARGS.wait_for_debugger,
1468          args=self._args,
1469          test_infos=self._test_infos,
1470      )
1471      reporter.print_starting_text()
1472
1473      tests_exit_code = ExitCode.SUCCESS
1474      for invocation in invocations:
1475        tests_exit_code |= invocation.run_all_tests(reporter)
1476
1477      atest_execution_info.AtestExecutionInfo.result_reporters.append(reporter)
1478      test_results.append((tests_exit_code, reporter, test_type))
1479
1480    all_tests_exit_code = ExitCode.SUCCESS
1481    failed_tests = []
1482    for tests_exit_code, reporter, test_type in test_results:
1483      atest_utils.colorful_print(
1484          RESULT_HEADER_FMT % {TEST_TYPE: test_type}, constants.MAGENTA
1485      )
1486      result = tests_exit_code | reporter.print_summary()
1487      if result:
1488        failed_tests.append(test_type)
1489      all_tests_exit_code |= result
1490
1491    # List failed tests at the end as a reminder.
1492    if failed_tests:
1493      atest_utils.colorful_print(
1494          atest_utils.delimiter('=', 30, prenl=1), constants.YELLOW
1495      )
1496      atest_utils.colorful_print('\nFollowing tests failed:', constants.MAGENTA)
1497      for failure in failed_tests:
1498        atest_utils.colorful_print(failure, constants.RED)
1499
1500    return all_tests_exit_code
1501
1502
1503class _TestModuleExecutionPlan(_TestExecutionPlan):
1504  """A plan to execute the test modules explicitly passed on the command-line."""
1505
1506  def __init__(
1507      self,
1508      args: argparse.Namespace,
1509      extra_args: Dict[str, Any],
1510      test_infos: List[test_info.TestInfo],
1511      test_runner_invocations: List[TestRunnerInvocation],
1512  ):
1513    super().__init__(args, extra_args, test_infos)
1514    self._test_runner_invocations = test_runner_invocations
1515
1516  @staticmethod
1517  def create(
1518      args: argparse.Namespace,
1519      test_infos: List[test_info.TestInfo],
1520      results_dir: str,
1521      mod_info: module_info.ModuleInfo,
1522  ) -> _TestModuleExecutionPlan:
1523    """Creates an instance of _TestModuleExecutionPlan.
1524
1525    Args:
1526        args: An argparse.Namespace instance holding parsed args.
1527        test_infos: A list of instances of TestInfo.
1528        results_dir: A directory which stores the ATest execution information.
1529        mod_info: An instance of ModuleInfo.
1530        dry_run: A boolean of whether this invocation is a dry run.
1531
1532    Returns:
1533        An instance of _TestModuleExecutionPlan.
1534    """
1535
1536    if not args.dry_run:
1537      _validate_exec_mode(args, test_infos)
1538
1539    # _validate_exec_mode appends --host automatically when pure
1540    # host-side tests, so re-parsing extra_args is a must.
1541    extra_args = get_extra_args(args)
1542
1543    invocations = test_runner_handler.create_test_runner_invocations(
1544        test_infos=test_infos,
1545        results_dir=results_dir,
1546        mod_info=mod_info,
1547        extra_args=extra_args,
1548        minimal_build=args.minimal_build,
1549    )
1550
1551    return _TestModuleExecutionPlan(
1552        args=args,
1553        extra_args=extra_args,
1554        test_infos=test_infos,
1555        test_runner_invocations=invocations,
1556    )
1557
1558  def requires_device_update(self) -> bool:
1559    return any(
1560        inv.requires_device_update() for inv in self._test_runner_invocations
1561    )
1562
1563  def required_build_targets(self) -> Set[str]:
1564    build_targets = set()
1565    for test_runner_invocation in self._test_runner_invocations:
1566      build_targets |= test_runner_invocation.get_test_runner_reqs()
1567
1568    return build_targets
1569
1570  def execute(self) -> ExitCode:
1571
1572    reporter = result_reporter.ResultReporter(
1573        collect_only=self.extra_args.get(constants.COLLECT_TESTS_ONLY),
1574        wait_for_debugger=atest_configs.GLOBAL_ARGS.wait_for_debugger,
1575        args=self._args,
1576        test_infos=self._test_infos,
1577    )
1578    reporter.print_starting_text()
1579
1580    exit_code = ExitCode.SUCCESS
1581    for invocation in self._test_runner_invocations:
1582      exit_code |= invocation.run_all_tests(reporter)
1583
1584    atest_execution_info.AtestExecutionInfo.result_reporters.append(reporter)
1585    return reporter.print_summary() | exit_code
1586
1587
1588if __name__ == '__main__':
1589  _AtestMain(sys.argv).run()
1590