xref: /aosp_15_r20/tools/asuite/atest/test_runners/atest_tf_test_runner.py (revision c2e18aaa1096c836b086f94603d04f4eb9cf37f5)
1# Copyright 2017, The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Atest Tradefed test runner class."""
16
17# pylint: disable=too-many-lines
18
19from __future__ import annotations
20from __future__ import print_function
21
22from abc import ABC, abstractmethod
23import dataclasses
24import enum
25from functools import partial
26import json
27import logging
28import os
29from pathlib import Path
30import re
31import select
32import shutil
33import socket
34import threading
35import time
36from typing import Any, Dict, List, Set, Tuple
37
38from atest import atest_configs
39from atest import atest_error
40from atest import atest_utils
41from atest import constants
42from atest import module_info
43from atest import result_reporter
44from atest import rollout_control
45from atest.atest_enum import DetectType, ExitCode
46from atest.coverage import coverage
47from atest.logstorage import logstorage_utils
48from atest.metrics import metrics
49from atest.test_finders import test_finder_utils
50from atest.test_finders import test_info
51from atest.test_finders.test_info import TestInfo
52from atest.test_runner_invocation import TestRunnerInvocation
53from atest.test_runners import test_runner_base as trb
54from atest.test_runners.event_handler import EventHandler
55
56POLL_FREQ_SECS = 10
57SOCKET_HOST = '127.0.0.1'
58SOCKET_QUEUE_MAX = 1
59SOCKET_BUFFER = 4096
60SELECT_TIMEOUT = 0.5
61
62# Env key for rolling subprocess output window height.
63_ROLLING_OUTPUT_WINDOW_HEIGHT_ENV_KEY = 'ATEST_ROLLING_OUTPUT_WINDOW_HEIGHT'
64
65# Socket Events of form FIRST_EVENT {JSON_DATA}\nSECOND_EVENT {JSON_DATA}
66# EVENT_RE has groups for the name and the data. "." does not match \n.
67EVENT_RE = re.compile(
68    r'\n*(?P<event_name>[A-Z_]+) (?P<json_data>{.*})(?=\n|.)*'
69)
70
71# Remove aapt from build dependency, use prebuilt version instead.
72EXEC_DEPENDENCIES = ('adb', 'fastboot')
73
74LOG_FOLDER_NAME = 'log'
75
76_INTEGRATION_FINDERS = frozenset(['', 'INTEGRATION', 'INTEGRATION_FILE_PATH'])
77
78# AAPT binary name
79_AAPT = 'aapt'
80
81# The exist code mapping of tradefed.
82_TF_EXIT_CODE = [
83    'NO_ERROR',
84    'CONFIG_EXCEPTION',
85    'NO_BUILD',
86    'DEVICE_UNRESPONSIVE',
87    'DEVICE_UNAVAILABLE',
88    'FATAL_HOST_ERROR',
89    'THROWABLE_EXCEPTION',
90    'NO_DEVICE_ALLOCATED',
91    'WRONG_JAVA_VERSION',
92]
93
94# The environment variable for TF preparer incremental setup.
95_INCREMENTAL_SETUP_KEY = 'TF_PREPARER_INCREMENTAL_SETUP'
96
97
98class Error(Exception):
99  """Module-level error."""
100
101
102class TradeFedExitError(Error):
103  """Raised when TradeFed exists before test run has finished."""
104
105  def __init__(self, exit_code):
106    super().__init__()
107    self.exit_code = exit_code
108
109  def __str__(self):
110    tf_error_reason = self._get_exit_reason(self.exit_code)
111    return (
112        'TradeFed subprocess exited early with exit code='
113        f'{self.exit_code}({tf_error_reason}).'
114    )
115
116  def _get_exit_reason(self, exit_code):
117    if 0 < exit_code < len(_TF_EXIT_CODE):
118      return atest_utils.mark_red(_TF_EXIT_CODE[exit_code])
119    return 'Unknown exit status'
120
121
122class AtestTradefedTestRunner(trb.TestRunnerBase):
123  """TradeFed Test Runner class."""
124
125  NAME = 'AtestTradefedTestRunner'
126  EXECUTABLE = 'atest_tradefed.sh'
127  # Common base template used by all TF tests
128  _TF_LOCAL_MIN = 'template/atest_local_min'
129  # Base template used by the device tests (tests requires device to run)
130  _TF_DEVICE_TEST_TEMPLATE = 'template/atest_device_test_base'
131  # Base template used by the deviceless tests
132  _TF_DEVICELESS_TEST_TEMPLATE = 'template/atest_deviceless_test_base'
133  # Use --no-enable-granular-attempts to control reporter replay behavior.
134  # TODO(b/142630648): Enable option enable-granular-attempts
135  # in sharding mode.
136  _LOG_ARGS = (
137      '--{log_root_option_name}={log_path} '
138      '{log_ext_option} '
139      '--no-enable-granular-attempts'
140  )
141  _RUN_CMD = (
142      '{env} {exe} {template} '
143      '--template:map test=atest '
144      '--template:map log_saver={log_saver} '
145      '{tf_customize_template} {log_args} {args}'
146  )
147  _BUILD_REQ = {'tradefed-core'}
148  _RERUN_OPTION_GROUP = [
149      constants.ITERATIONS,
150      constants.RERUN_UNTIL_FAILURE,
151      constants.RETRY_ANY_FAILURE,
152  ]
153
154  # We're using a class attribute because we're recreating runner instances
155  # for different purposes throughout an invocation.
156  # TODO(b/283352341): Remove this once we refactor to have runner instances.
157  _MINIMAL_BUILD_TARGETS = set()
158
159  def __init__(
160      self,
161      results_dir: str,
162      extra_args: Dict[str, Any],
163      mod_info: module_info.ModuleInfo = None,
164      minimal_build: bool = False,
165      **kwargs,
166  ):
167    """Init stuff for base class."""
168    super().__init__(results_dir, **kwargs)
169    self.module_info = mod_info
170    self.log_path = os.path.join(results_dir, LOG_FOLDER_NAME)
171    # (b/275537997) results_dir could be '' in test_runner_handler; only
172    # mkdir when it is invoked by run_tests.
173    if results_dir:
174      Path(self.log_path).mkdir(parents=True, exist_ok=True)
175    self.log_args = {
176        'log_root_option_name': constants.LOG_ROOT_OPTION_NAME,
177        'log_ext_option': constants.LOG_SAVER_EXT_OPTION,
178        'log_path': self.log_path,
179        'proto_path': os.path.join(
180            self.results_dir, constants.ATEST_TEST_RECORD_PROTO
181        ),
182    }
183    self.run_cmd_dict = {
184        'env': '',
185        'exe': self.EXECUTABLE,
186        'template': self._TF_LOCAL_MIN,
187        'log_saver': constants.ATEST_TF_LOG_SAVER,
188        'tf_customize_template': '',
189        'args': '',
190        'log_args': self._LOG_ARGS.format(**self.log_args),
191    }
192    self.root_dir = os.environ.get(constants.ANDROID_BUILD_TOP)
193    self._is_host_enabled = extra_args.get(constants.HOST, False)
194    self._minimal_build = minimal_build
195    logging.debug('Enable minimal build: %s' % self._minimal_build)
196    metrics.LocalDetectEvent(
197        detect_type=DetectType.IS_MINIMAL_BUILD, result=int(self._minimal_build)
198    )
199
200  def requires_device_update(
201      self, test_infos: List[test_info.TestInfo]
202  ) -> bool:
203    """Checks whether this runner requires device update."""
204
205    requires_device_update = False
206    for info in test_infos:
207      test = self._create_test(info)
208      requires_device_update |= test.requires_device_update()
209
210    return requires_device_update
211
212  # @typing.override
213  def create_invocations(
214      self,
215      extra_args: Dict[str, Any],
216      test_infos: List[test_info.TestInfo],
217  ) -> List[TestRunnerInvocation]:
218    """Create separate test runner invocations for device and deviceless tests.
219
220    Args:
221        extra_args: Dict of extra args to pass to the invocations
222        test_infos: A list of TestInfos.
223
224    Returns:
225        A list of TestRunnerInvocation instances.
226    """
227    invocations = []
228    device_test_infos, deviceless_test_infos = self._partition_tests(test_infos)
229    if deviceless_test_infos:
230      extra_args_for_deviceless_test = extra_args.copy()
231      extra_args_for_deviceless_test.update({constants.HOST: True})
232      invocations.append(
233          TestRunnerInvocation(
234              test_runner=self,
235              extra_args=extra_args_for_deviceless_test,
236              test_infos=deviceless_test_infos,
237          )
238      )
239    if device_test_infos:
240      extra_args_for_device_test = extra_args.copy()
241      if rollout_control.tf_preparer_incremental_setup.is_enabled():
242        extra_args_for_device_test.update({_INCREMENTAL_SETUP_KEY: True})
243      invocations.append(
244          TestRunnerInvocation(
245              test_runner=self,
246              extra_args=extra_args_for_device_test,
247              test_infos=device_test_infos,
248          )
249      )
250
251    return invocations
252
253  def _partition_tests(
254      self,
255      test_infos: List[test_info.TestInfo],
256  ) -> (List[test_info.TestInfo], List[test_info.TestInfo]):
257    """Partition input tests into two lists based on whether it requires device.
258
259    Args:
260        test_infos: A list of TestInfos.
261
262    Returns:
263        Two lists one contains device tests the other contains deviceless tests.
264    """
265    device_test_infos = []
266    deviceless_test_infos = []
267
268    for info in test_infos:
269      test = self._create_test(info)
270      if test.requires_device():
271        device_test_infos.append(info)
272      else:
273        deviceless_test_infos.append(info)
274
275    return device_test_infos, deviceless_test_infos
276
277  def _try_set_gts_authentication_key(self):
278    """Set GTS authentication key if it is available or exists.
279
280    Strategy:
281        Get APE_API_KEY from os.environ:
282            - If APE_API_KEY is already set by user -> do nothing.
283        Get the APE_API_KEY from constants:
284            - If the key file exists -> set to env var.
285        If APE_API_KEY isn't set and the key file doesn't exist:
286            - Warn user some GTS tests may fail without authentication.
287    """
288    if os.environ.get('APE_API_KEY'):
289      logging.debug('APE_API_KEY is set by developer.')
290      return
291    ape_api_key = constants.GTS_GOOGLE_SERVICE_ACCOUNT
292    key_path = os.path.join(self.root_dir, ape_api_key)
293    if ape_api_key and os.path.exists(key_path):
294      logging.debug('Set APE_API_KEY: %s', ape_api_key)
295      os.environ['APE_API_KEY'] = key_path
296    else:
297      logging.debug(
298          'APE_API_KEY not set, some GTS tests may fail without authentication.'
299      )
300
301  def run_tests(self, test_infos, extra_args, reporter):
302    """Run the list of test_infos. See base class for more.
303
304    Args:
305        test_infos: A list of TestInfos.
306        extra_args: Dict of extra args to add to test run.
307        reporter: An instance of result_report.ResultReporter.
308
309    Returns:
310        0 if tests succeed, non-zero otherwise.
311    """
312    logging.debug('TF test runner running tests %s', test_infos)
313    reporter.log_path = self.log_path
314    reporter.rerun_options = self._extract_rerun_options(extra_args)
315    # Set google service key if it's available or found before
316    # running tests.
317    self._try_set_gts_authentication_key()
318    result = 0
319    upload_start = time.time()
320    invocation_properties = {'atest_run_id': metrics.get_run_id()}
321
322    # Set crystalball_ingest property if there are performance tests.
323    is_perf_tests = False
324    for info in test_infos:
325      if 'performance-tests' in info.compatibility_suites:
326        is_perf_tests = True
327        break
328    if is_perf_tests:
329      invocation_properties['crystalball_ingest'] = 'yes'
330
331    creds, inv = (
332        logstorage_utils.do_upload_flow(extra_args, invocation_properties)
333        if logstorage_utils.is_upload_enabled(extra_args)
334        else (None, None)
335    )
336    metrics.LocalDetectEvent(
337        detect_type=DetectType.UPLOAD_FLOW_MS,
338        result=int((time.time() - upload_start) * 1000),
339    )
340    try:
341      verify_key = atest_utils.get_verify_key(
342          [test_infos[0].test_name], extra_args
343      )
344      # Change CWD to repo root to ensure TF can find prebuilt SDKs
345      # for some path-sensitive tests like robolectric.
346      os.chdir(os.path.abspath(os.getenv(constants.ANDROID_BUILD_TOP)))
347
348      # Copy symbols if there are tests belong to native test.
349      self._handle_native_tests(test_infos)
350
351      if os.getenv(trb.OLD_OUTPUT_ENV_VAR):
352        result = self.run_tests_raw(test_infos, extra_args, reporter)
353      else:
354        result = self.run_tests_pretty(test_infos, extra_args, reporter)
355    except atest_error.DryRunVerificationError as e:
356      atest_utils.colorful_print(str(e), constants.RED)
357      return ExitCode.VERIFY_FAILURE
358    finally:
359      if inv:
360        try:
361          logging.disable(logging.INFO)
362          # Always set invocation status to completed due to the ATest
363          # handle whole process by its own.
364          inv['schedulerState'] = 'completed'
365          logstorage_utils.BuildClient(creds).update_invocation(inv)
366          reporter.test_result_link = (
367              constants.RESULT_LINK % inv['invocationId']
368          )
369        finally:
370          logging.disable(logging.NOTSET)
371    return result
372
373  def run_tests_raw(self, test_infos, extra_args, reporter):
374    """Run the list of test_infos. See base class for more.
375
376    Args:
377        test_infos: A list of TestInfos.
378        extra_args: Dict of extra args to add to test run.
379        reporter: An instance of result_report.ResultReporter.
380
381    Returns:
382        0 if tests succeed, non-zero otherwise.
383    """
384    reporter.register_unsupported_runner(self.NAME)
385
386    ret_code = ExitCode.SUCCESS
387    run_cmds = self.generate_run_commands(test_infos, extra_args)
388    logging.debug('Running test: %s', run_cmds[0])
389    subproc = self.run(
390        run_cmds[0],
391        output_to_stdout=True,
392        env_vars=self.generate_env_vars(extra_args),
393    )
394    ret_code |= self.wait_for_subprocess(subproc)
395    return ret_code
396
397  def run_tests_pretty(self, test_infos, extra_args, reporter):
398    """Run the list of test_infos. See base class for more.
399
400    Args:
401        test_infos: A list of TestInfos.
402        extra_args: Dict of extra args to add to test run.
403        reporter: An instance of result_report.ResultReporter.
404
405    Returns:
406        0 if tests succeed, non-zero otherwise.
407    """
408    ret_code = ExitCode.SUCCESS
409    server = self._start_socket_server()
410    run_cmds = self.generate_run_commands(
411        test_infos, extra_args, server.getsockname()[1]
412    )
413    is_rolling_output = (
414        not extra_args.get(constants.VERBOSE, False)
415        and atest_utils.is_atty_terminal()
416        and rollout_control.rolling_tf_subprocess_output.is_enabled()
417    )
418
419    logging.debug('Running test: %s', run_cmds[0])
420    subproc = self.run(
421        run_cmds[0],
422        output_to_stdout=extra_args.get(constants.VERBOSE, False),
423        env_vars=self.generate_env_vars(extra_args),
424        rolling_output_lines=is_rolling_output,
425    )
426
427    if is_rolling_output:
428      height = os.environ.get(_ROLLING_OUTPUT_WINDOW_HEIGHT_ENV_KEY, None)
429      if height:
430        try:
431          height = int(height)
432        except ValueError:
433          atest_utils.print_and_log_warning(
434              'Invalid rolling output window height: %s', height
435          )
436      threading.Thread(
437          target=atest_utils.stream_io_output,
438          args=(
439              subproc.stdout,
440              height if height else atest_utils.DEFAULT_OUTPUT_ROLLING_LINES,
441          ),
442      ).start()
443
444    self.handle_subprocess(
445        subproc,
446        partial(self._start_monitor, server, subproc, reporter, extra_args),
447    )
448    server.close()
449    ret_code |= self.wait_for_subprocess(subproc)
450    return ret_code
451
452  # pylint: disable=too-many-branches
453  # pylint: disable=too-many-locals
454  def _start_monitor(self, server, tf_subproc, reporter, extra_args):
455    """Polling and process event.
456
457    Args:
458        server: Socket server object.
459        tf_subproc: The tradefed subprocess to poll.
460        reporter: Result_Reporter object.
461        extra_args: Dict of extra args to add to test run.
462    """
463    inputs = [server]
464    event_handlers = {}
465    data_map = {}
466    inv_socket = None
467    while inputs:
468      try:
469        readable, _, _ = select.select(inputs, [], [], SELECT_TIMEOUT)
470        for socket_object in readable:
471          if socket_object is server:
472            conn, addr = socket_object.accept()
473            logging.debug('Accepted connection from %s', addr)
474            conn.setblocking(False)
475            inputs.append(conn)
476            data_map[conn] = ''
477            # The First connection should be invocation
478            # level reporter.
479            if not inv_socket:
480              inv_socket = conn
481          else:
482            # Count invocation level reporter events
483            # without showing real-time information.
484            if inv_socket == socket_object:
485              reporter.silent = True
486              event_handler = event_handlers.setdefault(
487                  socket_object, EventHandler(reporter, self.NAME)
488              )
489            else:
490              event_handler = event_handlers.setdefault(
491                  socket_object,
492                  EventHandler(
493                      result_reporter.ResultReporter(
494                          collect_only=extra_args.get(
495                              constants.COLLECT_TESTS_ONLY
496                          ),
497                      ),
498                      self.NAME,
499                  ),
500              )
501            recv_data = self._process_connection(
502                data_map, socket_object, event_handler
503            )
504            if not recv_data:
505              inputs.remove(socket_object)
506              socket_object.close()
507      finally:
508        # Subprocess ended and all socket clients were closed.
509        if tf_subproc.poll() is not None and len(inputs) == 1:
510          inputs.pop().close()
511          if not reporter.all_test_results:
512            if atest_configs.GLOBAL_ARGS.user_type:
513              atest_utils.colorful_print(
514                  "The test module doesn't support "
515                  f"'{atest_configs.GLOBAL_ARGS.user_type}' "
516                  'user type, please check test config.',
517                  constants.RED,
518              )
519            atest_utils.colorful_print(
520                r'No test results available. TradeFed did not find'
521                r' any test cases to run. This is possibly due to'
522                r' the no tests matching the current test filters'
523                r' or misconfigured AndroidTest.xml. Test Logs'
524                r' are saved in '
525                f'{reporter.log_path}.',
526                constants.RED,
527                constants.WHITE,
528            )
529          if not data_map:
530            metrics.LocalDetectEvent(
531                detect_type=DetectType.TF_EXIT_CODE,
532                result=tf_subproc.returncode,
533            )
534            raise TradeFedExitError(tf_subproc.returncode)
535          self._handle_log_associations(event_handlers)
536
537  def _process_connection(self, data_map, conn, event_handler):
538    """Process a socket connection between TF and ATest.
539
540    Expect data of form EVENT_NAME {JSON_DATA}.  Multiple events will be
541    \n deliminated.  Need to buffer data in case data exceeds socket
542    buffer.
543    E.q.
544        TEST_RUN_STARTED {runName":"hello_world_test","runAttempt":0}\n
545        TEST_STARTED {"start_time":2172917, "testName":"PrintHelloWorld"}\n
546    Args:
547        data_map: The data map of all connections.
548        conn: Socket connection.
549        event_handler: EventHandler object.
550
551    Returns:
552        True if conn.recv() has data , False otherwise.
553    """
554    # Set connection into blocking mode.
555    conn.settimeout(None)
556    data = conn.recv(SOCKET_BUFFER)
557    if isinstance(data, bytes):
558      data = data.decode()
559    logging.debug('received: %s', data)
560    if data:
561      data_map[conn] += data
562      while True:
563        match = EVENT_RE.match(data_map[conn])
564        if not match:
565          break
566        try:
567          event_data = json.loads(match.group('json_data'))
568        except ValueError:
569          logging.debug('Json incomplete, wait for more data')
570          break
571        event_name = match.group('event_name')
572        event_handler.process_event(event_name, event_data)
573        data_map[conn] = data_map[conn][match.end() :]
574    return bool(data)
575
576  def _start_socket_server(self):
577    """Start a TCP server."""
578    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
579    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
580    # Port 0 lets the OS pick an open port between 1024 and 65535.
581    server.bind((SOCKET_HOST, 0))
582    server.listen(SOCKET_QUEUE_MAX)
583    server.settimeout(POLL_FREQ_SECS)
584    logging.debug('Socket server started on port %s', server.getsockname()[1])
585    return server
586
587  def generate_env_vars(self, extra_args):
588    """Convert extra args and test_infos into env vars.
589
590    Args:
591        extra_args: Dict of extra args to add to test run.
592        test_infos: A list of TestInfos.
593
594    Returns:
595        A dict modified from os.getenv.copy().
596    """
597    env_vars = os.environ.copy()
598    if constants.TF_GLOBAL_CONFIG and is_log_upload_enabled(extra_args):
599      env_vars['TF_GLOBAL_CONFIG'] = constants.TF_GLOBAL_CONFIG
600    debug_port = extra_args.get(constants.TF_DEBUG, '')
601    if debug_port:
602      env_vars['TF_DEBUG'] = 'true'
603      env_vars['TF_DEBUG_PORT'] = str(debug_port)
604
605    filtered_paths = []
606    for path in str(env_vars.get('PYTHONPATH', '')).split(':'):
607      # TODO (b/166216843) Remove the hacky PYTHON path workaround.
608      if (
609          str(path).startswith('/tmp/Soong.python_')
610          and str(path).find('googleapiclient') > 0
611      ):
612        continue
613      filtered_paths.append(path)
614    if filtered_paths:
615      env_vars['PYTHONPATH'] = ':'.join(filtered_paths)
616
617    # Use prebuilt aapt if there's no aapt under android system path which
618    # is aligned with build system.
619    # https://android.googlesource.com/platform/build/+/master/core/config.mk#529
620    if self._is_missing_exec(_AAPT):
621      prebuilt_aapt = Path.joinpath(
622          atest_utils.get_prebuilt_sdk_tools_dir(), _AAPT
623      )
624      if os.path.exists(prebuilt_aapt):
625        env_vars['PATH'] = str(prebuilt_aapt.parent) + ':' + env_vars['PATH']
626
627    # Add an env variable for the classpath that only contains the host jars
628    # required for the tests we'll be running.
629    if self._minimal_build:
630      self._generate_host_jars_env_var(env_vars)
631
632    return env_vars
633
634  def _generate_host_jars_env_var(self, env_vars):
635    def is_host_jar(p):
636      return p.suffix == '.jar' and p.is_relative_to(
637          Path(os.getenv(constants.ANDROID_HOST_OUT))
638      )
639
640    all_host_jars = []
641
642    for target in AtestTradefedTestRunner._MINIMAL_BUILD_TARGETS:
643      if target.variant != Variant.HOST:
644        continue
645      # Only use the first host jar because the same jar may be installed
646      # to multiple places.
647      module_host_jars = [
648          p
649          for p in self.module_info.get_installed_paths(target.module_name)
650          if is_host_jar(p)
651      ]
652      all_host_jars.extend(
653          [str(module_host_jars[0])] if module_host_jars else []
654      )
655
656    env_vars['ATEST_HOST_JARS'] = ':'.join(set(all_host_jars))
657    logging.debug(
658        'Set env ATEST_HOST_JARS: %s.', env_vars.get('ATEST_HOST_JARS')
659    )
660
661  # pylint: disable=unnecessary-pass
662  # Please keep above disable flag to ensure host_env_check is overridden.
663  def host_env_check(self):
664    """Check that host env has everything we need.
665
666    We actually can assume the host env is fine because we have the same
667    requirements that atest has. Update this to check for android env vars
668    if that changes.
669    """
670    pass
671
672  @staticmethod
673  def _is_missing_exec(executable):
674    """Check if system build executable is available.
675
676    Args:
677        executable: Executable we are checking for.
678
679    Returns:
680        True if executable is missing, False otherwise.
681    """
682    output = shutil.which(executable)
683    if not output:
684      return True
685    # TODO: Check if there is a clever way to determine if system adb is
686    # good enough.
687    root_dir = os.environ.get(constants.ANDROID_BUILD_TOP, '')
688    return os.path.commonprefix([output, root_dir]) != root_dir
689
690  def _use_minimal_build(self, test_infos: List[test_info.TestInfo]) -> bool:
691
692    if not self._minimal_build:
693      return False
694
695    unsupported = set()
696    for t_info in test_infos:
697      if t_info.test_finder in [
698          'CONFIG',
699          'INTEGRATION',
700          'INTEGRATION_FILE_PATH',
701      ]:
702        unsupported.add(t_info.test_name)
703      # For ltp and kselftest, keep it as no-minimal-build.
704      elif t_info.test_name in (
705          constants.REQUIRED_LTP_TEST_MODULES
706          + constants.REQUIRED_KSELFTEST_TEST_MODULES
707      ):
708        unsupported.add(t_info.test_name)
709
710    if not unsupported:
711      return True
712
713    atest_utils.print_and_log_warning(
714        'Minimal build was disabled because the following tests do not support'
715        ' it: %s',
716        unsupported,
717    )
718    return False
719
720  def get_test_runner_build_reqs(
721      self, test_infos: List[test_info.TestInfo]
722  ) -> Set[str]:
723    """Return the build requirements.
724
725    Args:
726        test_infos: List of TestInfo.
727
728    Returns:
729        Set of build targets.
730    """
731    if self._use_minimal_build(test_infos):
732      return self._get_test_runner_reqs_minimal(test_infos)
733
734    return self._get_test_runner_build_reqs_maximal(test_infos)
735
736  def _get_test_runner_build_reqs_maximal(
737      self, test_infos: List[test_info.TestInfo]
738  ) -> Set[str]:
739    build_req = self._BUILD_REQ.copy()
740    # Use different base build requirements if google-tf is around.
741    if self.module_info.is_module(constants.GTF_MODULE):
742      build_req = {constants.GTF_TARGET}
743    # Always add ATest's own TF target.
744    build_req.add(constants.ATEST_TF_MODULE)
745    # Add adb if we can't find it.
746    for executable in EXEC_DEPENDENCIES:
747      if self._is_missing_exec(executable):
748        if self.module_info.is_module(executable):
749          build_req.add(executable)
750
751    # Force rebuilt all jars under $ANDROID_HOST_OUT to prevent old version
752    # host jars break the test.
753    build_req |= self._get_host_framework_targets()
754
755    build_req |= trb.gather_build_targets(test_infos)
756    return build_req
757
758  def _get_test_runner_reqs_minimal(
759      self, test_infos: List[test_info.TestInfo]
760  ) -> Set[str]:
761
762    build_targets = set()
763    runtime_targets = set()
764
765    for info in test_infos:
766      test = self._create_test(info)
767      build_targets.update(test.query_build_targets())
768      runtime_targets.update(test.query_runtime_targets())
769
770    AtestTradefedTestRunner._MINIMAL_BUILD_TARGETS = runtime_targets
771
772    build_targets = {t.name() for t in build_targets}
773
774    return build_targets
775
776  def _create_test(self, t_info: test_info.TestInfo) -> Test:
777
778    info = self.module_info.get_module_info(t_info.raw_test_name)
779
780    if not info:
781      # In cases module info does not exist (e.g. TF integration tests), use the
782      # TestInfo to determine the test type. In the future we should ensure all
783      # tests have their corresponding module info and only rely on the module
784      # info to determine the test type.
785      atest_utils.print_and_log_warning(
786          'Could not find module information for %s', t_info.raw_test_name
787      )
788      return self._guess_test_type_for_missing_module(t_info)
789
790    def _select_variant(info):
791      variants = self.module_info.build_variants(info)
792      if len(variants) < 2:
793        return Variant.HOST if variants[0] == 'HOST' else Variant.DEVICE
794      return Variant.HOST if self._is_host_enabled else Variant.DEVICE
795
796    if not self._is_host_enabled and self.module_info.requires_device(info):
797      return DeviceTest(info, _select_variant(info), t_info.mainline_modules)
798
799    return DevicelessTest(info, _select_variant(info))
800
801  def _guess_test_type_for_missing_module(
802      self, t_info: test_info.TestInfo
803  ) -> Test:
804    """Determine the test type (device or deviceless) without module info."""
805    if (
806        not self._is_host_enabled
807        and t_info.get_supported_exec_mode() != constants.DEVICELESS_TEST
808    ):
809      return DeviceTest(None, Variant.DEVICE, t_info.mainline_modules)
810
811    return DevicelessTest(None, Variant.HOST)
812
813  def _get_host_framework_targets(self) -> Set[str]:
814    """Get the build targets for all the existing jars under host framework.
815
816    Returns:
817        A set of build target name under $(ANDROID_HOST_OUT)/framework.
818    """
819    host_targets = set()
820    if not self.module_info:
821      return host_targets
822
823    framework_host_dir = Path(
824        os.environ.get(constants.ANDROID_HOST_OUT)
825    ).joinpath('framework')
826    if framework_host_dir.is_dir():
827      jars = framework_host_dir.glob('*.jar')
828      for jar in jars:
829        if self.module_info.is_module(jar.stem):
830          host_targets.add(jar.stem)
831      logging.debug('Found exist host framework target:%s', host_targets)
832    return host_targets
833
834  def _parse_extra_args(self, test_infos, extra_args):
835    """Convert the extra args into something tf can understand.
836
837    Args:
838        extra_args: Dict of args
839
840    Returns:
841        Tuple of args to append and args not supported.
842    """
843    args_to_append, args_not_supported = extra_args_to_tf_args(
844        extra_args, self.module_info
845    )
846
847    # Set exclude instant app annotation for non-instant mode run.
848    if constants.INSTANT not in extra_args and self._has_instant_app_config(
849        test_infos, self.module_info
850    ):
851      args_to_append.append(constants.TF_TEST_ARG)
852      args_to_append.append(
853          '{tf_class}:{option_name}:{option_value}'.format(
854              tf_class=constants.TF_AND_JUNIT_CLASS,
855              option_name=constants.TF_EXCLUDE_ANNOTATE,
856              option_value=constants.INSTANT_MODE_ANNOTATE,
857          )
858      )
859    # Force append --enable-parameterized-modules if args_to_append has
860    # --module-parameter in args_to_append
861    if constants.TF_MODULE_PARAMETER in args_to_append:
862      if constants.TF_ENABLE_PARAMETERIZED_MODULES not in args_to_append:
863        args_to_append.append(constants.TF_ENABLE_PARAMETERIZED_MODULES)
864    # If all the test config has config with auto enable parameter, force
865    # exclude those default parameters(ex: instant_app, secondary_user)
866    # TODO: (b/228433541) Remove the limitation after the root cause fixed.
867    if len(test_infos) <= 1 and self._is_all_tests_parameter_auto_enabled(
868        test_infos
869    ):
870      if constants.TF_ENABLE_PARAMETERIZED_MODULES not in args_to_append:
871        args_to_append.append(constants.TF_ENABLE_PARAMETERIZED_MODULES)
872        for exclude_parameter in constants.DEFAULT_EXCLUDE_PARAS:
873          args_to_append.append('--exclude-module-parameters')
874          args_to_append.append(exclude_parameter)
875    return args_to_append, args_not_supported
876
877  def generate_run_commands(self, test_infos, extra_args, port=None):
878    """Generate a single run command from TestInfos.
879
880    Args:
881        test_infos: A list of TestInfo instances.
882        extra_args: A Dict of extra args to append.
883        port: Optional. An int of the port number to send events to. If None,
884          then subprocess reporter in TF won't try to connect.
885
886    Returns:
887        A list that contains the string of atest tradefed run command.
888        Only one command is returned.
889    """
890    if any(
891        'performance-tests' in info.compatibility_suites for info in test_infos
892    ):
893      self.run_cmd_dict['template'] = 'template/performance-tests-base'
894    elif extra_args.get(constants.USE_TF_MIN_BASE_TEMPLATE):
895      self.run_cmd_dict['template'] = self._TF_LOCAL_MIN
896    else:
897      self.run_cmd_dict['template'] = (
898          self._TF_DEVICELESS_TEST_TEMPLATE
899          if extra_args.get(constants.HOST)
900          else self._TF_DEVICE_TEST_TEMPLATE
901      )
902
903    args = self._create_test_args(test_infos, extra_args)
904
905    # Create a copy of args as more args could be added to the list.
906    test_args = list(args)
907    if port:
908      test_args.extend(['--subprocess-report-port', str(port)])
909    if extra_args.get(constants.INVOCATION_ID, None):
910      test_args.append(
911          '--invocation-data invocation_id=%s'
912          % extra_args[constants.INVOCATION_ID]
913      )
914    if extra_args.get(constants.WORKUNIT_ID, None):
915      test_args.append(
916          '--invocation-data work_unit_id=%s'
917          % extra_args[constants.WORKUNIT_ID]
918      )
919    if extra_args.get(constants.LOCAL_BUILD_ID, None):
920      # TODO: (b/207584685) Replace with TF local build solutions.
921      test_args.append('--use-stub-build true')
922      test_args.append(
923          '--stub-build-id %s' % extra_args[constants.LOCAL_BUILD_ID]
924      )
925      test_args.append(
926          '--stub-build-target %s' % extra_args[constants.BUILD_TARGET]
927      )
928    for info in test_infos:
929      if atest_utils.get_test_and_mainline_modules(info.test_name):
930        # TODO(b/253641058) Remove this once mainline module
931        # binaries are stored under testcase directory.
932        if not extra_args.get(constants.DRY_RUN):
933          self._copy_mainline_module_binary(info.mainline_modules)
934        test_args.append(constants.TF_ENABLE_MAINLINE_PARAMETERIZED_MODULES)
935        break
936    # For detailed logs, set TF options log-level/log-level-display as
937    # 'VERBOSE' by default.
938    log_level = 'VERBOSE'
939    test_args.extend(['--log-level-display', log_level])
940    test_args.extend(['--log-level', log_level])
941
942    # Set no-early-device-release by default to speed up TF teardown time.
943    # TODO(b/300882567) remove this forever when it's the default behavor.
944    test_args.extend(['--no-early-device-release'])
945
946    args_to_add, args_not_supported = self._parse_extra_args(
947        test_infos, extra_args
948    )
949
950    # If multiple devices in test config, automatically append
951    # --replicate-parent-setup and --multi-device-count
952    device_count = atest_configs.GLOBAL_ARGS.device_count_config
953    if device_count and device_count > 1:
954      args_to_add.append('--replicate-parent-setup')
955      args_to_add.append('--multi-device-count')
956      args_to_add.append(str(device_count))
957      os.environ.pop(constants.ANDROID_SERIAL, None)
958    else:
959      # TODO(b/122889707) Remove this after finding the root cause.
960      env_serial = os.environ.get(constants.ANDROID_SERIAL)
961      # Use the env variable ANDROID_SERIAL if it's set by user but only
962      # when the target tests are not deviceless tests.
963      if (
964          env_serial
965          and '--serial' not in args_to_add
966          and '-n' not in args_to_add
967      ):
968        args_to_add.append('--serial')
969        args_to_add.append(env_serial)
970
971    test_args.extend(args_to_add)
972    if args_not_supported:
973      atest_utils.print_and_log_info(
974          '%s does not support the following args %s',
975          self.EXECUTABLE,
976          args_not_supported,
977      )
978
979    # Only need to check one TestInfo to determine if the tests are
980    # configured in TEST_MAPPING.
981    for_test_mapping = test_infos and test_infos[0].from_test_mapping
982    if is_log_upload_enabled(extra_args):
983      test_args.extend(atest_utils.get_result_server_args(for_test_mapping))
984    self.run_cmd_dict['args'] = ' '.join(test_args)
985    self.run_cmd_dict['tf_customize_template'] = (
986        self._extract_customize_tf_templates(extra_args)
987    )
988
989    # By default using ATestFileSystemLogSaver no matter what running under
990    # aosp or internal branches. Only switch using google log saver if user
991    # tend to upload test result to AnTS which could be detected by the
992    # invocation_id in extra args.
993    if is_log_upload_enabled(extra_args):
994      self.use_google_log_saver()
995
996    run_commands = [self._RUN_CMD.format(**self.run_cmd_dict)]
997    logging.debug('TF test runner generated run commands %s', run_commands)
998    return run_commands
999
1000  def _flatten_test_infos(self, test_infos):
1001    """Sort and group test_infos by module_name and sort and group filters
1002
1003    by class name.
1004
1005        Example of three test_infos in a set:
1006            Module1, {(classA, {})}
1007            Module1, {(classB, {Method1})}
1008            Module1, {(classB, {Method2}}
1009        Becomes a set with one element:
1010            Module1, {(ClassA, {}), (ClassB, {Method1, Method2})}
1011        Where:
1012              Each line is a test_info namedtuple
1013              {} = Frozenset
1014              () = TestFilter namedtuple
1015
1016    Args:
1017        test_infos: A list of TestInfo namedtuples.
1018
1019    Returns:
1020        A list of TestInfos flattened.
1021    """
1022    results = []
1023    for module, group in atest_utils.sort_and_group(
1024        test_infos, lambda x: x.test_name
1025    ):
1026
1027      # module is a string, group is a generator of grouped TestInfos.
1028      # Module Test, so flatten test_infos:
1029      no_filters = False
1030      filters = set()
1031      test_runner = None
1032      test_finder = None
1033      build_targets = set()
1034      data = {}
1035      module_args = []
1036      for test_info_i in group:
1037        data.update(test_info_i.data)
1038        # Extend data with constants.TI_MODULE_ARG instead of
1039        # overwriting.
1040        module_args.extend(test_info_i.data.get(constants.TI_MODULE_ARG, []))
1041        test_runner = test_info_i.test_runner
1042        test_finder = test_info_i.test_finder
1043        build_targets |= test_info_i.build_targets
1044        test_filters = test_info_i.data.get(constants.TI_FILTER)
1045        if not test_filters or no_filters:
1046          # test_info wants whole module run, so hardcode no filters.
1047          no_filters = True
1048          filters = set()
1049          continue
1050        filters |= test_filters
1051      if module_args:
1052        data[constants.TI_MODULE_ARG] = module_args
1053      data[constants.TI_FILTER] = self.flatten_test_filters(filters)
1054      results.append(
1055          test_info.TestInfo(
1056              test_name=module,
1057              test_runner=test_runner,
1058              test_finder=test_finder,
1059              build_targets=build_targets,
1060              data=data,
1061          )
1062      )
1063    return results
1064
1065  @staticmethod
1066  def flatten_test_filters(filters):
1067    """Sort and group test_filters by class_name.
1068
1069        Example of three test_filters in a frozenset:
1070            classA, {}
1071            classB, {Method1}
1072            classB, {Method2}
1073        Becomes a frozenset with these elements:
1074            classA, {}
1075            classB, {Method1, Method2}
1076        Where:
1077            Each line is a TestFilter namedtuple
1078            {} = Frozenset
1079
1080    Args:
1081        filters: A frozenset of test_filters.
1082
1083    Returns:
1084        A frozenset of test_filters flattened.
1085    """
1086    results = set()
1087    for class_name, group in atest_utils.sort_and_group(
1088        filters, lambda x: x.class_name
1089    ):
1090
1091      # class_name is a string, group is a generator of TestFilters
1092      assert class_name is not None
1093      methods = set()
1094      for test_filter in group:
1095        if not test_filter.methods:
1096          # Whole class should be run
1097          methods = set()
1098          break
1099        methods |= test_filter.methods
1100      results.add(test_info.TestFilter(class_name, frozenset(methods)))
1101    return frozenset(results)
1102
1103  def _is_all_tests_parameter_auto_enabled(self, test_infos):
1104    """Check if all the test infos are parameter auto enabled.
1105
1106    Args:
1107        test_infos: A set of TestInfo instances.
1108
1109    Returns: True if all tests are parameter auto enabled, False otherwise.
1110    """
1111    for info in test_infos:
1112      if not self._is_parameter_auto_enabled_cfg(info, self.module_info):
1113        return False
1114    return True
1115
1116  def _create_test_args(
1117      self, test_infos: list[TestInfo], extra_args: Dict[str, Any]
1118  ) -> list[str]:
1119    """Compile TF command line args based on the given test infos.
1120
1121    Args:
1122        test_infos: A list of TestInfo instances.
1123        extra_args: A Dict of extra args for test runners to utilize.
1124
1125    Returns: A list of TF arguments to run the tests.
1126    """
1127    args = []
1128    if not test_infos:
1129      return []
1130
1131    if atest_configs.GLOBAL_ARGS.group_test:
1132      test_infos = self._flatten_test_infos(test_infos)
1133
1134    has_integration_test = False
1135
1136    # Because current --include-filter arg will not working if ATest pass
1137    # both --module and --include-filter to TF, only test by --module will
1138    # be run. Make a check first, only use --module if all tests are all
1139    # parameter auto enabled.
1140    # Only auto-enable the parameter if there's only one test.
1141    # TODO: (b/228433541) Remove the limitation after the root cause fixed.
1142    use_module_arg = False
1143    if len(test_infos) <= 1:
1144      use_module_arg = self._is_all_tests_parameter_auto_enabled(test_infos)
1145
1146    for info in test_infos:
1147      # Integration test exists in TF's jar, so it must have the option
1148      # if it's integration finder.
1149      if info.test_finder in _INTEGRATION_FINDERS:
1150        has_integration_test = True
1151      # For non-parameterize test module, use --include-filter, but for
1152      # tests which have auto enable parameterize config use --module
1153      # instead.
1154      if use_module_arg and self._is_parameter_auto_enabled_cfg(
1155          info, self.module_info
1156      ):
1157        args.extend([constants.TF_MODULE_FILTER, info.test_name])
1158      else:
1159        args.extend([constants.TF_INCLUDE_FILTER, info.test_name])
1160      for option in info.data.get(constants.TI_MODULE_ARG, []):
1161        if constants.TF_INCLUDE_FILTER_OPTION == option[0]:
1162          suite_filter = constants.TF_SUITE_FILTER_ARG_VALUE_FMT.format(
1163              test_name=info.test_name, option_value=option[1]
1164          )
1165          args.extend([constants.TF_INCLUDE_FILTER, suite_filter])
1166        elif constants.TF_EXCLUDE_FILTER_OPTION == option[0]:
1167          suite_filter = constants.TF_SUITE_FILTER_ARG_VALUE_FMT.format(
1168              test_name=info.test_name, option_value=option[1]
1169          )
1170          args.extend([constants.TF_EXCLUDE_FILTER, suite_filter])
1171        else:
1172          module_arg = constants.TF_MODULE_ARG_VALUE_FMT.format(
1173              test_name=info.test_name,
1174              option_name=option[0],
1175              option_value=option[1],
1176          )
1177          args.extend([constants.TF_MODULE_ARG, module_arg])
1178
1179    # Add ATest include filter
1180    args.extend(
1181        get_include_filter(
1182            test_infos, extra_args.get(constants.TEST_FILTER, None)
1183        )
1184    )
1185
1186    # TODO (b/141090547) Pass the config path to TF to load configs.
1187    # Compile option in TF if finder is not INTEGRATION or not set.
1188    if not has_integration_test:
1189      args.append(constants.TF_SKIP_LOADING_CONFIG_JAR)
1190    return args
1191
1192  def _extract_rerun_options(self, extra_args):
1193    """Extract rerun options to a string for output.
1194
1195    Args:
1196        extra_args: Dict of extra args for test runners to use.
1197
1198    Returns: A string of rerun options.
1199    """
1200    extracted_options = [
1201        '{} {}'.format(arg, extra_args[arg])
1202        for arg in extra_args
1203        if arg in self._RERUN_OPTION_GROUP
1204    ]
1205    return ' '.join(extracted_options)
1206
1207  def _extract_customize_tf_templates(self, extra_args: dict[str]) -> str:
1208    """Extract tradefed template options to a string for output.
1209
1210    Args:
1211        extra_args: Dict of extra args for test runners to use.
1212
1213    Returns:
1214        A string of tradefed template options.
1215    """
1216    tf_templates = extra_args.get(constants.TF_TEMPLATE, [])
1217    return ' '.join(['--template:map %s' % x for x in tf_templates])
1218
1219  def _handle_log_associations(self, event_handlers):
1220    """Handle TF's log associations information data.
1221
1222    log_association dict:
1223    {'loggedFile': '/tmp/serial-util11375755456514097276.ser',
1224     'dataName': 'device_logcat_setup_127.0.0.1:58331',
1225     'time': 1602038599.856113},
1226
1227    Args:
1228        event_handlers: Dict of {socket_object:EventHandler}.
1229    """
1230    log_associations = []
1231    for _, event_handler in event_handlers.items():
1232      if event_handler.log_associations:
1233        log_associations += event_handler.log_associations
1234    device_test_end_log_time = ''
1235    device_teardown_log_time = ''
1236    for log_association in log_associations:
1237      if 'device_logcat_test' in log_association.get('dataName', ''):
1238        device_test_end_log_time = log_association.get('time')
1239      if 'device_logcat_teardown' in log_association.get('dataName', ''):
1240        device_teardown_log_time = log_association.get('time')
1241    if device_test_end_log_time and device_teardown_log_time:
1242      teardowntime = float(device_teardown_log_time) - float(
1243          device_test_end_log_time
1244      )
1245      logging.debug('TF logcat teardown time=%s seconds.', teardowntime)
1246      metrics.LocalDetectEvent(
1247          detect_type=DetectType.TF_TEARDOWN_LOGCAT, result=int(teardowntime)
1248      )
1249
1250  @staticmethod
1251  def _has_instant_app_config(test_infos, mod_info):
1252    """Check if one of the input tests defined instant app mode in config.
1253
1254    Args:
1255        test_infos: A set of TestInfo instances.
1256        mod_info: ModuleInfo object.
1257
1258    Returns: True if one of the tests set up instant app mode.
1259    """
1260    for tinfo in test_infos:
1261      test_config, _ = test_finder_utils.get_test_config_and_srcs(
1262          tinfo, mod_info
1263      )
1264      if test_config:
1265        parameters = atest_utils.get_config_parameter(test_config)
1266        if constants.TF_PARA_INSTANT_APP in parameters:
1267          return True
1268    return False
1269
1270  @staticmethod
1271  def _is_parameter_auto_enabled_cfg(tinfo, mod_info):
1272    """Check if input tests contains auto enable support parameters.
1273
1274    Args:
1275        test_infos: A set of TestInfo instances.
1276        mod_info: ModuleInfo object.
1277
1278    Returns: True if input test has parameter setting which is not in the
1279             exclude list.
1280    """
1281    test_config, _ = test_finder_utils.get_test_config_and_srcs(tinfo, mod_info)
1282    if test_config:
1283      parameters = atest_utils.get_config_parameter(test_config)
1284      if (
1285          parameters
1286          - constants.DEFAULT_EXCLUDE_PARAS
1287          - constants.DEFAULT_EXCLUDE_NOT_PARAS
1288      ):
1289        return True
1290    return False
1291
1292  def _handle_native_tests(self, test_infos):
1293    """Handling some extra tasks for running native tests from tradefed.
1294
1295    Args:
1296        test_infos: A set of TestInfo instances.
1297    """
1298    for tinfo in test_infos:
1299      test_config, _ = test_finder_utils.get_test_config_and_srcs(
1300          tinfo, self.module_info
1301      )
1302      if test_config:
1303        module_name, device_path = atest_utils.get_config_gtest_args(
1304            test_config
1305        )
1306        if module_name and device_path:
1307          atest_utils.copy_native_symbols(module_name, device_path)
1308
1309  # TODO(b/253641058) remove copying files once mainline module
1310  # binaries are stored under testcase directory.
1311  def _copy_mainline_module_binary(self, mainline_modules):
1312    """Copies mainline module binaries to out/dist/mainline_modules_{arch}
1313
1314    Copies the mainline module binaries to the location that
1315    MainlineModuleHandler in TF expects since there is no way to
1316    explicitly tweak the search path.
1317
1318    Args:
1319        mainline_modules: A list of mainline modules.
1320    """
1321    config = atest_utils.get_android_config()
1322    arch = config.get('TARGET_ARCH')
1323    dest_dir = atest_utils.DIST_OUT_DIR.joinpath(f'mainline_modules_{arch}')
1324    dest_dir.mkdir(parents=True, exist_ok=True)
1325
1326    for module in mainline_modules:
1327      target_module_info = self.module_info.get_module_info(module)
1328      installed_paths = target_module_info[constants.MODULE_INSTALLED]
1329
1330      for installed_path in installed_paths:
1331        file_name = Path(installed_path).name
1332        dest_path = Path(dest_dir).joinpath(file_name)
1333        if dest_path.exists():
1334          atest_utils.colorful_print(
1335              'Replacing APEX in %s with %s' % (dest_path, installed_path),
1336              constants.CYAN,
1337          )
1338          logging.debug(
1339              'deleting the old file: %s and copy a new binary', dest_path
1340          )
1341          dest_path.unlink()
1342        shutil.copyfile(installed_path, dest_path)
1343
1344        break
1345
1346  def use_google_log_saver(self):
1347    """Replace the original log saver to google log saver."""
1348    self.log_args.update({
1349        'log_root_option_name': constants.GOOGLE_LOG_SAVER_LOG_ROOT_OPTION_NAME,
1350        'log_ext_option': constants.GOOGLE_LOG_SAVER_EXT_OPTION,
1351    })
1352    self.run_cmd_dict.update({
1353        'log_saver': constants.GOOGLE_LOG_SAVER,
1354        'log_args': self._LOG_ARGS.format(**self.log_args),
1355    })
1356
1357
1358def is_log_upload_enabled(extra_args: Dict[str, Any]) -> bool:
1359  """Check if input extra_args include google log saver related args.
1360
1361  Args:
1362      extra_args: Dict of args.
1363  """
1364  return bool(extra_args.get(constants.INVOCATION_ID, None))
1365
1366
1367def generate_annotation_filter_args(
1368    arg_value: Any,
1369    mod_info: module_info.ModuleInfo,
1370    test_infos: List[test_info.TestInfo],
1371) -> List[str]:
1372  """Generate TF annotation filter arguments.
1373
1374  Args:
1375      arg_value: Argument value for annotation filter.
1376      mod_info: ModuleInfo object.
1377      test_infos: A set of TestInfo instances.
1378
1379  Returns:
1380      List of TF annotation filter arguments.
1381  """
1382  annotation_filter_args = []
1383  for info in test_infos:
1384    test_name = info.test_name
1385    for keyword in arg_value:
1386      annotation = atest_utils.get_full_annotation_class_name(
1387          mod_info.get_module_info(test_name), keyword
1388      )
1389      if annotation:
1390        module_arg = constants.TF_MODULE_ARG_VALUE_FMT.format(
1391            test_name=test_name,
1392            option_name=constants.INCLUDE_ANNOTATION,
1393            option_value=annotation,
1394        )
1395        annotation_filter_args.extend([constants.TF_MODULE_ARG, module_arg])
1396      atest_utils.print_and_log_error(
1397          atest_utils.mark_red(f'Cannot find similar annotation: {keyword}')
1398      )
1399  return annotation_filter_args
1400
1401
1402def extra_args_to_tf_args(
1403    extra_args: Dict[str, Any], mod_info: module_info.ModuleInfo = None
1404) -> Tuple[Dict[str, Any], Dict[str, Any]]:
1405  """Convert the extra args into atest_tf_test_runner supported args.
1406
1407  Args:
1408      extra_args: Dict of args
1409      mod_info: ModuleInfo object.
1410
1411  Returns:
1412      Tuple of ARGS that atest_tf supported and not supported.
1413  """
1414  supported_args = []
1415  unsupported_args = []
1416
1417  def constant_list(*value):
1418    return lambda *_: value
1419
1420  # pylint: disable=unused-argument
1421  def print_message(message):
1422    def inner(*args):
1423      print(message)
1424      return []
1425
1426    return inner
1427
1428  # Mapping supported TF arguments to the processing function.
1429  supported_tf_args = dict({
1430      constants.WAIT_FOR_DEBUGGER: constant_list('--wait-for-debugger'),
1431      constants.DISABLE_INSTALL: constant_list('--disable-target-preparers'),
1432      constants.SERIAL: lambda arg_value: [
1433          j for d in arg_value for j in ('--serial', d)
1434      ],
1435      constants.SHARDING: lambda arg_value: ['--shard-count', str(arg_value)],
1436      constants.DISABLE_TEARDOWN: constant_list('--disable-teardown'),
1437      constants.HOST: constant_list(
1438          '-n', '--prioritize-host-config', '--skip-host-arch-check'
1439      ),
1440      constants.CUSTOM_ARGS:
1441      # custom args value is a list.
1442      lambda arg_value: arg_value,
1443      constants.ALL_ABI: constant_list('--all-abi'),
1444      constants.INSTANT: constant_list(
1445          constants.TF_ENABLE_PARAMETERIZED_MODULES,
1446          constants.TF_MODULE_PARAMETER,
1447          'instant_app',
1448      ),
1449      constants.USER_TYPE: lambda arg_value: [
1450          constants.TF_ENABLE_PARAMETERIZED_MODULES,
1451          '--enable-optional-parameterization',
1452          constants.TF_MODULE_PARAMETER,
1453          str(arg_value),
1454      ],
1455      constants.ITERATIONS: lambda arg_value: [
1456          '--retry-strategy',
1457          constants.ITERATIONS,
1458          '--max-testcase-run-count',
1459          str(arg_value),
1460      ],
1461      constants.RERUN_UNTIL_FAILURE: lambda arg_value: [
1462          '--retry-strategy',
1463          constants.RERUN_UNTIL_FAILURE,
1464          '--max-testcase-run-count',
1465          str(arg_value),
1466      ],
1467      constants.RETRY_ANY_FAILURE: lambda arg_value: [
1468          '--retry-strategy',
1469          constants.RETRY_ANY_FAILURE,
1470          '--max-testcase-run-count',
1471          str(arg_value),
1472      ],
1473      constants.COLLECT_TESTS_ONLY: constant_list('--collect-tests-only'),
1474      constants.TF_DEBUG: print_message('Please attach process to your IDE...'),
1475      constants.ANNOTATION_FILTER: generate_annotation_filter_args,
1476      constants.TEST_FILTER: lambda arg_value: [
1477          '--test-arg',
1478          (
1479              'com.android.tradefed.testtype.AndroidJUnitTest:'
1480              f'include-filter:{arg_value}'
1481          ),
1482          '--test-arg',
1483          (
1484              'com.android.tradefed.testtype.GTest:native-test-flag:'
1485              f'--gtest_filter={arg_value}'
1486          ),
1487          '--test-arg',
1488          (
1489              'com.android.tradefed.testtype.HostGTest:native-test-flag:'
1490              f'--gtest_filter={arg_value}'
1491          ),
1492      ],
1493      constants.TEST_TIMEOUT: lambda arg_value: [
1494          '--test-arg',
1495          (
1496              'com.android.tradefed.testtype.AndroidJUnitTest:'
1497              f'shell-timeout:{arg_value}'
1498          ),
1499          '--test-arg',
1500          (
1501              'com.android.tradefed.testtype.AndroidJUnitTest:'
1502              f'test-timeout:{arg_value}'
1503          ),
1504          '--test-arg',
1505          (
1506              'com.android.tradefed.testtype.HostGTest:'
1507              f'native-test-timeout:{arg_value}'
1508          ),
1509          '--test-arg',
1510          (
1511              'com.android.tradefed.testtype.GTest:'
1512              f'native-test-timeout:{arg_value}'
1513          ),
1514          '--test-arg',
1515          (
1516              'com.android.compatibility.testtype.LibcoreTest:'
1517              f'test-timeout:{arg_value}'
1518          ),
1519      ],
1520      constants.COVERAGE: lambda _: coverage.tf_args(mod_info),
1521      _INCREMENTAL_SETUP_KEY: constant_list('--incremental-setup=YES'),
1522  })
1523
1524  for arg in extra_args:
1525    if arg in supported_tf_args:
1526      tf_args = supported_tf_args[arg](extra_args[arg])
1527      if tf_args:
1528        supported_args.extend(tf_args)
1529      continue
1530
1531    if arg in (
1532        constants.TF_TEMPLATE,
1533        constants.INVOCATION_ID,
1534        constants.WORKUNIT_ID,
1535        constants.REQUEST_UPLOAD_RESULT,
1536        constants.DISABLE_UPLOAD_RESULT,
1537        constants.LOCAL_BUILD_ID,
1538        constants.BUILD_TARGET,
1539        constants.DRY_RUN,
1540        constants.DEVICE_ONLY,
1541    ):
1542      continue
1543    unsupported_args.append(arg)
1544  return supported_args, unsupported_args
1545
1546
1547def get_include_filter(
1548    test_infos: List[test_info.TestInfo], test_filter_arg: str = None
1549) -> List[str]:
1550  """Generate a list of tradefed filter argument from TestInfos.
1551
1552  Args:
1553      test_infos: a List of TestInfo object.
1554      test_filter_arg: the value of the desired test filter passed by the user
1555        using the --test-filter flag.
1556
1557  The include filter pattern looks like:
1558      --atest-include-filter <module-name>:<include-filter-value>
1559
1560  Returns:
1561      List of Tradefed command args.
1562  """
1563  tf_args = []
1564  for info in test_infos:
1565    # If a --test-filter is specified by the user, use the test filter in addition to the
1566    # fully qualified module:test#method name for each test.
1567    if test_filter_arg:
1568      formatted_test_filter_arg = (
1569          constants.TF_ATEST_INCLUDE_FILTER_VALUE_FMT.format(
1570              test_name=info.test_name, test_filter=test_filter_arg
1571          )
1572      )
1573      tf_args.extend(
1574          [constants.TF_ATEST_INCLUDE_FILTER, formatted_test_filter_arg]
1575      )
1576    filters = []
1577    for test_info_filter in info.data.get(constants.TI_FILTER, []):
1578      filters.extend(test_info_filter.to_list_of_tf_strings())
1579    for test_filter in filters:
1580      filter_arg = constants.TF_ATEST_INCLUDE_FILTER_VALUE_FMT.format(
1581          test_name=info.test_name, test_filter=test_filter
1582      )
1583      tf_args.extend([constants.TF_ATEST_INCLUDE_FILTER, filter_arg])
1584  return tf_args
1585
1586
1587@enum.unique
1588class Variant(enum.Enum):
1589  """The variant of a build module."""
1590
1591  NONE = ''
1592  HOST = 'host'
1593  DEVICE = 'target'
1594
1595  def __init__(self, suffix):
1596    self._suffix = suffix
1597
1598  @property
1599  def suffix(self) -> str:
1600    """The suffix without the 'dash' used to qualify build targets."""
1601    return self._suffix
1602
1603
1604@dataclasses.dataclass(frozen=True)
1605class Target:
1606  """A build target."""
1607
1608  module_name: str
1609  variant: Variant
1610
1611  def name(self) -> str:
1612    """The name to use on the command-line to build this target."""
1613    if not self.variant.suffix:
1614      return self.module_name
1615    return f'{self.module_name}-{self.variant.suffix}'
1616
1617
1618class Test(ABC):
1619  """A test that can be run."""
1620
1621  _DEFAULT_HARNESS_TARGETS = frozenset(
1622      [
1623          Target('atest-tradefed', Variant.HOST),
1624          Target('atest_script_help.sh', Variant.HOST),
1625          Target('atest_tradefed.sh', Variant.HOST),
1626          Target('tradefed', Variant.HOST),
1627      ]
1628      + [Target(t, Variant.HOST) for t in constants.GTF_TARGETS]
1629  )
1630
1631  def query_build_targets(self) -> Set[Target]:
1632    """Returns the list of build targets required to run this test."""
1633    build_targets = set()
1634    build_targets.update(self._get_harness_build_targets())
1635    build_targets.update(self._get_test_build_targets())
1636    return build_targets
1637
1638  @abstractmethod
1639  def query_runtime_targets(self) -> Set[Target]:
1640    """Returns the list of targets required during runtime."""
1641
1642  @abstractmethod
1643  def _get_test_build_targets(self) -> Set[Target]:
1644    """Returns the list of build targets of test and its dependencies."""
1645
1646  @abstractmethod
1647  def _get_harness_build_targets(self) -> Set[Target]:
1648    """Returns the list of build targets of test harness and its dependencies."""
1649
1650  @abstractmethod
1651  def requires_device(self) -> bool:
1652    """Returns true if the test requires a device, otherwise false."""
1653
1654  @abstractmethod
1655  def requires_device_update(self) -> bool:
1656    """Checks whether the test requires device update."""
1657
1658
1659class DeviceTest(Test):
1660  """A device test that can be run."""
1661
1662  def __init__(
1663      self, info: Dict[str, Any], variant: Variant, mainline_modules: Set[str]
1664  ):
1665
1666    self._info = info
1667    self._variant = variant
1668    self._mainline_modules = mainline_modules
1669
1670  def query_runtime_targets(self) -> Set[Target]:
1671    return self.query_build_targets() | _get_host_required_deps(self._info)
1672
1673  def requires_device(self):
1674    return True
1675
1676  def requires_device_update(self):
1677    # The test doesn't need device update as long as it's a unit test,
1678    # no matter if it's running on device or host.
1679    # Some tests (e.g. TF integration tests) do not have module info, and we
1680    # can't determine whether they require device update or not. So that we
1681    # treat them as they require device update to avoid disabling the device
1682    # update mistakenly.
1683    return not self._info or not module_info.ModuleInfo.is_unit_test(self._info)
1684
1685  def _get_test_build_targets(self) -> Set[Target]:
1686    module_name = self._info[constants.MODULE_INFO_ID]
1687    build_targets = set([Target(module_name, self._variant)])
1688    build_targets.update(_get_libs_deps(self._info, self._variant))
1689    build_targets.update(
1690        Target(m, Variant.NONE) for m in self._mainline_modules
1691    )
1692    return build_targets
1693
1694  def _get_harness_build_targets(self):
1695    build_targets = set(Test._DEFAULT_HARNESS_TARGETS)
1696    build_targets.update(
1697        set([
1698            Target('adb', Variant.HOST),
1699            Target('aapt', Variant.HOST),
1700            Target('aapt2', Variant.HOST),
1701            Target('compatibility-host-util', Variant.HOST),
1702        ])
1703    )
1704
1705    # Auto-generated Java tests use a module template that uses the Dalvik
1706    # test runner and requires the implementation jars. See
1707    # https://source.corp.google.com/android-internal/build/make/core/java_test_config_template.xml.
1708    # These dependencies should ideally be automatically added by the build
1709    # rule since Atest can fall out of sync otherwise.
1710    # TODO(b/284987354): Remove these targets once the build rule adds the
1711    # required deps.
1712    if _is_dalvik_test_module(self._info):
1713      build_targets.add(Target('cts-dalvik-host-test-runner', Variant.HOST))
1714      build_targets.add(Target('cts-dalvik-device-test-runner', Variant.DEVICE))
1715
1716    if 'vts' in self._info.get(constants.MODULE_COMPATIBILITY_SUITES, []):
1717      # Note that we do not include `compatibility-tradefed` which is
1718      # already included in the VTS harness.
1719      build_targets.add(Target('vts-core-tradefed-harness', Variant.HOST))
1720    else:
1721      build_targets.add(Target('compatibility-tradefed', Variant.HOST))
1722
1723    return build_targets
1724
1725
1726class DevicelessTest(Test):
1727
1728  def __init__(self, info: Dict[str, Any], variant: Variant):
1729    self._info = info
1730    self._variant = variant
1731
1732  def _get_test_build_targets(self) -> Set[Target]:
1733    module_name = self._info[constants.MODULE_INFO_ID]
1734    return set([Target(module_name, self._variant)])
1735
1736  def _get_harness_build_targets(self):
1737    build_targets = set(Test._DEFAULT_HARNESS_TARGETS)
1738    build_targets.update(
1739        set([
1740            # TODO(b/277116853): Remove the adb dependency for deviceless tests.
1741            Target('adb', Variant.HOST),
1742        ])
1743    )
1744    return build_targets
1745
1746  def query_runtime_targets(self) -> Set[Target]:
1747    return self.query_build_targets()
1748
1749  def requires_device(self):
1750    return False
1751
1752  def requires_device_update(self):
1753    return False
1754
1755
1756def _get_libs_deps(info: Dict[str, Any], variant: Variant) -> Set[Target]:
1757
1758  # We only need the runtime dependencies with host variant since TradeFed
1759  # won't push any runtime dependencies to the test device and the runtime
1760  # dependencies with device variant should already exist on the test device.
1761  if variant != Variant.HOST:
1762    return set()
1763
1764  deps = set()
1765  deps.update([Target(m, variant) for m in info.get(constants.MODULE_LIBS, [])])
1766
1767  return deps
1768
1769
1770def _get_host_required_deps(info: Dict[str, Any]) -> Set[Target]:
1771
1772  deps = set()
1773  deps.update(
1774      Target(m, Variant.HOST) for m in info.get(constants.MODULE_HOST_DEPS, [])
1775  )
1776
1777  return deps
1778
1779
1780def _is_dalvik_test_module(info: Dict[str, Any]) -> bool:
1781  return 'JAVA_LIBRARIES' in info.get(
1782      constants.MODULE_CLASS, []
1783  ) and True in info.get(constants.MODULE_AUTO_TEST_CONFIG, [])
1784