xref: /aosp_15_r20/tools/asuite/atest/atest_utils_unittest.py (revision c2e18aaa1096c836b086f94603d04f4eb9cf37f5)
1#!/usr/bin/env python3
2#
3# Copyright 2018, The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""Unittests for atest_utils."""
18
19# pylint: disable=invalid-name
20
21import hashlib
22from io import StringIO
23import os
24from pathlib import Path
25import subprocess
26import sys
27import tempfile
28import unittest
29from unittest import mock
30
31from atest import arg_parser
32from atest import atest_error
33from atest import atest_utils
34from atest import constants
35from atest import unittest_constants
36from atest import unittest_utils
37from atest.atest_enum import FilterType
38from atest.test_finders import test_info
39from pyfakefs import fake_filesystem_unittest
40
41TEST_MODULE_NAME_A = 'ModuleNameA'
42TEST_RUNNER_A = 'FakeTestRunnerA'
43TEST_BUILD_TARGET_A = set(['bt1', 'bt2'])
44TEST_DATA_A = {'test_data_a_1': 'a1', 'test_data_a_2': 'a2'}
45TEST_SUITE_A = 'FakeSuiteA'
46TEST_MODULE_CLASS_A = 'FAKE_MODULE_CLASS_A'
47TEST_INSTALL_LOC_A = set(['host', 'device'])
48TEST_FINDER_A = 'MODULE'
49TEST_INFO_A = test_info.TestInfo(
50    TEST_MODULE_NAME_A,
51    TEST_RUNNER_A,
52    TEST_BUILD_TARGET_A,
53    TEST_DATA_A,
54    TEST_SUITE_A,
55    TEST_MODULE_CLASS_A,
56    TEST_INSTALL_LOC_A,
57)
58TEST_INFO_A.test_finder = TEST_FINDER_A
59TEST_ZIP_DATA_DIR = 'zip_files'
60TEST_SINGLE_ZIP_NAME = 'single_file.zip'
61TEST_MULTI_ZIP_NAME = 'multi_file.zip'
62
63REPO_INFO_OUTPUT = """Manifest branch: test_branch
64Manifest merge branch: refs/heads/test_branch
65Manifest groups: all,-notdefault
66----------------------------
67"""
68
69
70class StreamIoOutputTest(unittest.TestCase):
71  """Class that tests the _stream_io_output function."""
72
73  def test_stream_io_output_no_max_lines_no_clear_line_code(self):
74    """Test when max_lines is None, no clear line code is written to the stream."""
75    io_input = StringIO()
76    io_input.write(f'1\n' * 10)
77    io_input.seek(0)
78    io_output = StringIO()
79
80    atest_utils.stream_io_output(
81        io_input, max_lines=None, io_output=io_output, is_io_output_atty=True
82    )
83
84    self.assertNotIn(
85        atest_utils._BASH_CLEAR_PREVIOUS_LINE_CODE, io_output.getvalue()
86    )
87
88  @mock.patch.object(atest_utils, 'get_terminal_size', return_value=(5, -1))
89  def test_stream_io_output_wrap_long_lines(self, _):
90    """Test when max_lines is set, long lines will be wrapped."""
91    io_input = StringIO()
92    io_input.write(f'1' * 10)
93    io_input.seek(0)
94    io_output = StringIO()
95
96    atest_utils.stream_io_output(
97        io_input, max_lines=10, io_output=io_output, is_io_output_atty=True
98    )
99
100    self.assertIn('11111\n11111', io_output.getvalue())
101
102  @mock.patch.object(atest_utils, 'get_terminal_size', return_value=(5, -1))
103  def test_stream_io_output_clear_lines_over_max_lines(self, _):
104    """Test when line exceeds max_lines, the previous lines are cleared."""
105    io_input = StringIO()
106    io_input.write('1\n2\n3\n')
107    io_input.seek(0)
108    io_output = StringIO()
109
110    atest_utils.stream_io_output(
111        io_input, max_lines=2, io_output=io_output, is_io_output_atty=True
112    )
113
114    self.assertIn(
115        '2\n3\n',
116        io_output.getvalue(),
117    )
118    self.assertNotIn(
119        '1\n2\n3\n',
120        io_output.getvalue(),
121    )
122
123  @mock.patch.object(atest_utils, 'get_terminal_size', return_value=(5, -1))
124  def test_stream_io_output_no_clear_lines_under_max_lines(self, _):
125    """Test when line is under max_lines, the previous lines are not cleared."""
126    io_input = StringIO()
127    io_input.write('1\n2\n3\n')
128    io_input.seek(0)
129    io_output = StringIO()
130
131    atest_utils.stream_io_output(
132        io_input, max_lines=4, io_output=io_output, is_io_output_atty=True
133    )
134
135    self.assertIn(
136        '1\n2\n3\n',
137        io_output.getvalue(),
138    )
139
140  @mock.patch.object(atest_utils, 'get_terminal_size', return_value=(5, -1))
141  def test_stream_io_output_no_lines_written_no_lines_cleared(self, _):
142    """Test when nothing is written, no lines are cleared."""
143    io_input = StringIO()
144    io_output = StringIO()
145
146    atest_utils.stream_io_output(
147        io_input, max_lines=2, io_output=io_output, is_io_output_atty=True
148    )
149
150    self.assertNotIn(
151        atest_utils._BASH_CLEAR_PREVIOUS_LINE_CODE,
152        io_output.getvalue(),
153    )
154
155  @mock.patch.object(atest_utils, 'get_terminal_size', return_value=(5, -1))
156  def test_stream_io_output_replace_tab_with_spaces(self, _):
157    """Test when line exceeds max_lines, the previous lines are cleared."""
158    io_input = StringIO()
159    io_input.write('1\t2')
160    io_input.seek(0)
161    io_output = StringIO()
162
163    atest_utils.stream_io_output(
164        io_input, max_lines=2, io_output=io_output, is_io_output_atty=True
165    )
166
167    self.assertNotIn(
168        '\t',
169        io_output.getvalue(),
170    )
171
172
173class ConcatenatePathTest(unittest.TestCase):
174  """Class that tests path concatenation."""
175
176  @classmethod
177  def setUpClass(cls):
178    """Mock the environment variables for the entire test class"""
179    cls.build_top = '/src/build_top'
180    cls.prod_out = '/src/build_top/product_out'
181    cls.host_out = '/src/build_top/host_out'
182    cls.target_out_cases = '/src/build_top/product_out/testcases'
183    cls.host_out_cases = '/src/build_top/host_out/testcases'
184    cls.target_product = 'test_target_product'
185    cls.build_variant = 'test_build_variant'
186    cls.mock_getenv = mock.patch.dict(
187        os.environ,
188        {
189            'ANDROID_BUILD_TOP': cls.build_top,
190            'ANDROID_PRODUCT_OUT': cls.prod_out,
191            'ANDROID_TARGET_OUT_TESTCASES': cls.target_out_cases,
192            'ANDROID_HOST_OUT': cls.host_out,
193            'ANDROID_HOST_OUT_TESTCASES': cls.host_out_cases,
194            'TARGET_PRODUCT': cls.target_product,
195            'TARGET_BUILD_VARIANT': cls.build_variant,
196        },
197    )
198    cls.mock_getenv.start()
199
200  @classmethod
201  def tearDownClass(cls):
202    """Clean up the mocks after the test class finishes"""
203    cls.mock_getenv.stop()
204
205  def test_get_vars(self):
206    """Test the values of AndroidVariables are expected"""
207    variables = atest_utils.AndroidVariables()
208
209    self.assertEqual(variables.build_top, self.build_top)
210    self.assertEqual(variables.product_out, self.prod_out)
211    self.assertEqual(variables.target_out_cases, self.target_out_cases)
212    self.assertEqual(variables.host_out, self.host_out)
213    self.assertEqual(variables.host_out_cases, self.host_out_cases)
214    self.assertEqual(variables.target_product, self.target_product)
215    self.assertEqual(variables.build_variant, self.build_variant)
216
217  def test_atest_utils_get_build_top(self):
218    """Test concatenating strings with get_build_top()."""
219    expected_path = Path(self.build_top, 'path/to/project')
220
221    return_path = atest_utils.get_build_top('path/to/project')
222
223    self.assertEqual(expected_path, return_path)
224
225  def test_atest_utils_get_product_out(self):
226    """Test concatenating strings with get_product_out()."""
227    expected_path = Path(self.prod_out, 'module-info.json')
228
229    return_path = atest_utils.get_product_out('module-info.json')
230
231    self.assertEqual(expected_path, return_path)
232
233  def test_atest_utils_get_host_out(self):
234    """Test concatenating strings with get_host_out()."""
235    expected_path = Path(self.host_out, 'bin/adb')
236
237    return_path = atest_utils.get_host_out('bin', 'adb')
238
239    self.assertEqual(expected_path, return_path)
240
241
242class GetBuildOutDirTests(unittest.TestCase):
243  """Test get_build_out_dir() for various conditions."""
244
245  def setUp(self) -> None:
246    self.abs_OUT_DIR = '/somewhere/out'
247    self.rel_OUT_DIR = 'somewhere/out'
248    self.abs_OUT_DIR_COMMON_BASE = '/somewhere/common_out'
249    self.rel_OUT_DIR_COMMON_BASE = 'somewhere/common_out'
250
251  def test_get_build_abs_out_dir(self):
252    """Test when OUT_DIR is an absolute path."""
253    with mock.patch.dict(
254        'os.environ',
255        {
256            constants.ANDROID_BUILD_TOP: '/src/build/top',
257            'OUT_DIR': self.abs_OUT_DIR,
258        },
259    ):
260      expected_out_dir = Path(self.abs_OUT_DIR)
261
262      returned_out_dir = atest_utils.get_build_out_dir()
263
264      self.assertEqual(expected_out_dir, returned_out_dir)
265
266  def test_get_build_rel_out_dir(self):
267    """Test when OUT_DIR is a relative path."""
268    with mock.patch.dict(
269        'os.environ',
270        {
271            constants.ANDROID_BUILD_TOP: '/src/build/top',
272            'OUT_DIR': self.rel_OUT_DIR,
273        },
274    ):
275      expected_out_dir = atest_utils.get_build_top(self.rel_OUT_DIR)
276
277      returned_out_dir = atest_utils.get_build_out_dir()
278
279      self.assertEqual(expected_out_dir, returned_out_dir)
280
281  def test_get_build_abs_out_dir_common_base(self):
282    """Test whe OUT_DIR_COMMON_BASE is an absolute path."""
283    build_top_path = '/src/build/top'
284    branch_name = Path(build_top_path).name
285    with mock.patch.dict(
286        'os.environ',
287        {
288            constants.ANDROID_BUILD_TOP: build_top_path,
289            'OUT_DIR_COMMON_BASE': self.abs_OUT_DIR_COMMON_BASE,
290        },
291    ):
292      expected_out_dir = Path(self.abs_OUT_DIR_COMMON_BASE, branch_name)
293
294      returned_out_dir = atest_utils.get_build_out_dir()
295
296      self.assertEqual(expected_out_dir, returned_out_dir)
297
298  def test_get_build_rel_out_dir_common_base(self):
299    """Test whe OUT_DIR_COMMON_BASE is a relative path."""
300    build_top_path = '/src/build/top'
301    branch_name = Path(build_top_path).name
302    with mock.patch.dict(
303        'os.environ',
304        {
305            constants.ANDROID_BUILD_TOP: build_top_path,
306            'OUT_DIR_COMMON_BASE': self.rel_OUT_DIR_COMMON_BASE,
307        },
308    ):
309      expected_out_dir = Path(
310          build_top_path, self.rel_OUT_DIR_COMMON_BASE, branch_name
311      )
312
313      returned_out_dir = atest_utils.get_build_out_dir()
314
315      self.assertEqual(expected_out_dir, returned_out_dir)
316
317  def test_get_build_out_dir(self):
318    """Test when OUT_DIR and OUT_DIR_COMMON_BASE are null."""
319    with mock.patch.dict(
320        'os.environ', {constants.ANDROID_BUILD_TOP: '/src/build/top'}
321    ):
322      expected_out_dir = atest_utils.get_build_top('out')
323
324      returned_out_dir = atest_utils.get_build_out_dir()
325
326      self.assertEqual(expected_out_dir, returned_out_dir)
327
328
329# pylint: disable=protected-access
330# pylint: disable=too-many-public-methods
331class AtestUtilsUnittests(unittest.TestCase):
332  """Unit tests for atest_utils.py"""
333
334  def test_capture_fail_section_has_fail_section(self):
335    """Test capture_fail_section when has fail section."""
336    test_list = [
337        'AAAAAA',
338        'FAILED: Error1',
339        '^\n',
340        'Error2\n',
341        '[  6% 191/2997] BBBBBB\n',
342        'CCCCC',
343        '[  20% 322/2997] DDDDDD\n',
344        'EEEEE',
345    ]
346    want_list = ['FAILED: Error1', '^\n', 'Error2\n']
347    self.assertEqual(want_list, atest_utils._capture_fail_section(test_list))
348
349  def test_capture_fail_section_no_fail_section(self):
350    """Test capture_fail_section when no fail section."""
351    test_list = ['[ 6% 191/2997] XXXXX', 'YYYYY: ZZZZZ']
352    want_list = []
353    self.assertEqual(want_list, atest_utils._capture_fail_section(test_list))
354
355  def test_is_test_mapping_none_test_mapping_args(self):
356    """Test method is_test_mapping."""
357    non_tm_args = ['--host-unit-test-only']
358
359    for argument in non_tm_args:
360      args = arg_parser.create_atest_arg_parser().parse_args([argument])
361      self.assertFalse(
362          atest_utils.is_test_mapping(args),
363          'Option %s indicates NOT a test_mapping!' % argument,
364      )
365
366  def test_is_test_mapping_test_mapping_args(self):
367    """Test method is_test_mapping."""
368    tm_args = ['--test-mapping', '--include-subdirs']
369
370    for argument in tm_args:
371      args = arg_parser.create_atest_arg_parser().parse_args([argument])
372      self.assertTrue(
373          atest_utils.is_test_mapping(args),
374          'Option %s indicates a test_mapping!' % argument,
375      )
376
377  def test_is_test_mapping_implicit_test_mapping(self):
378    """Test method is_test_mapping."""
379    args = arg_parser.create_atest_arg_parser().parse_args(
380        ['--test', '--build', ':postsubmit']
381    )
382    self.assertTrue(
383        atest_utils.is_test_mapping(args),
384        'Option %s indicates a test_mapping!' % args,
385    )
386
387  def test_is_test_mapping_with_testname(self):
388    """Test method is_test_mapping."""
389    irrelevant_args = ['--test', ':postsubmit', 'testname']
390
391    args = arg_parser.create_atest_arg_parser().parse_args(irrelevant_args)
392    self.assertFalse(
393        atest_utils.is_test_mapping(args),
394        'Option %s indicates a test_mapping!' % args,
395    )
396
397  def test_is_test_mapping_false(self):
398    """Test method is_test_mapping."""
399    args = arg_parser.create_atest_arg_parser().parse_args(
400        ['--test', '--build', 'hello_atest']
401    )
402
403    self.assertFalse(atest_utils.is_test_mapping(args))
404
405  def test_has_colors(self):
406    """Test method _has_colors."""
407    # stream is file I/O
408    stream = open('/tmp/test_has_colors.txt', 'wb')
409    self.assertFalse(atest_utils._has_colors(stream))
410    stream.close()
411
412    # stream is not a tty(terminal).
413    stream = mock.Mock()
414    stream.isatty.return_value = False
415    self.assertFalse(atest_utils._has_colors(stream))
416
417    # stream is a tty(terminal).
418    stream = mock.Mock()
419    stream.isatty.return_value = True
420    self.assertTrue(atest_utils._has_colors(stream))
421
422  @mock.patch('atest.atest_utils._has_colors')
423  def test_colorize(self, mock_has_colors):
424    """Test method colorize."""
425    original_str = 'test string'
426    green_no = 2
427
428    # _has_colors() return False.
429    mock_has_colors.return_value = False
430    converted_str = atest_utils.colorize(
431        original_str, green_no, bp_color=constants.RED
432    )
433    self.assertEqual(original_str, converted_str)
434
435    # Green text with red background.
436    mock_has_colors.return_value = True
437    converted_str = atest_utils.colorize(
438        original_str, green_no, bp_color=constants.RED
439    )
440    green_highlight_string = '\x1b[1;32;41m%s\x1b[0m' % original_str
441    self.assertEqual(green_highlight_string, converted_str)
442
443    # Green text, no background.
444    mock_has_colors.return_value = True
445    converted_str = atest_utils.colorize(original_str, green_no)
446    green_no_highlight_string = '\x1b[1;32m%s\x1b[0m' % original_str
447    self.assertEqual(green_no_highlight_string, converted_str)
448
449  @mock.patch('atest.atest_utils.colorful_print')
450  @mock.patch('logging.error')
451  def test_print_and_log_error_no_format_prints_and_logs(
452      self, mocked_print, locked_error_logging
453  ):
454    atest_utils.print_and_log_error('no format')
455
456    mocked_print.assert_called_once()
457    locked_error_logging.assert_called_once()
458
459  @mock.patch('atest.atest_utils.colorful_print')
460  def test_print_and_log_error_single_non_string_prints(self, mocked_print):
461    atest_utils.print_and_log_error(123)
462
463    mocked_print.assert_called_once()
464
465  @mock.patch('atest.atest_utils.colorful_print')
466  def test_print_and_log_error_with_format_prints(self, mocked_print):
467    atest_utils.print_and_log_error('1+1=%s', 2)
468
469    mocked_print.assert_called_once()
470
471  @mock.patch('atest.atest_utils.colorful_print')
472  def test_print_and_log_error_bad_value_no_throw_no_print(self, mocked_print):
473    atest_utils.print_and_log_error('bad format %', 'format arg')
474
475    mocked_print.assert_not_called()
476
477  @mock.patch('atest.atest_utils.colorful_print')
478  def test_print_and_log_error_missing_format_arg_no_print(self, mocked_print):
479    atest_utils.print_and_log_error('bad format %s %s', 'format arg')
480
481    mocked_print.assert_not_called()
482
483  @mock.patch('atest.atest_utils.colorful_print')
484  def test_print_and_log_error_extra_format_arg_no_print(self, mocked_print):
485    atest_utils.print_and_log_error(
486        'bad format %s', 'format arg1', 'format arg2'
487    )
488
489    mocked_print.assert_not_called()
490
491  @mock.patch('atest.atest_utils._has_colors')
492  def test_colorful_print(self, mock_has_colors):
493    """Test method colorful_print."""
494    testing_str = 'color_print_test'
495    green_no = 2
496
497    # _has_colors() return False.
498    mock_has_colors.return_value = False
499    capture_output = StringIO()
500    sys.stdout = capture_output
501    atest_utils.colorful_print(
502        testing_str, green_no, bp_color=constants.RED, auto_wrap=False
503    )
504    sys.stdout = sys.__stdout__
505    uncolored_string = testing_str
506    self.assertEqual(capture_output.getvalue(), uncolored_string)
507
508    # Green text with red background, but no wrap.
509    mock_has_colors.return_value = True
510    capture_output = StringIO()
511    sys.stdout = capture_output
512    atest_utils.colorful_print(
513        testing_str, green_no, bp_color=constants.RED, auto_wrap=False
514    )
515    sys.stdout = sys.__stdout__
516    green_highlight_no_wrap_string = '\x1b[1;32;41m%s\x1b[0m' % testing_str
517    self.assertEqual(capture_output.getvalue(), green_highlight_no_wrap_string)
518
519    # Green text, no background, no wrap.
520    mock_has_colors.return_value = True
521    capture_output = StringIO()
522    sys.stdout = capture_output
523    atest_utils.colorful_print(testing_str, green_no, auto_wrap=False)
524    sys.stdout = sys.__stdout__
525    green_no_high_no_wrap_string = '\x1b[1;32m%s\x1b[0m' % testing_str
526    self.assertEqual(capture_output.getvalue(), green_no_high_no_wrap_string)
527
528    # Green text with red background and wrap.
529    mock_has_colors.return_value = True
530    capture_output = StringIO()
531    sys.stdout = capture_output
532    atest_utils.colorful_print(
533        testing_str, green_no, bp_color=constants.RED, auto_wrap=True
534    )
535    sys.stdout = sys.__stdout__
536    green_highlight_wrap_string = '\x1b[1;32;41m%s\x1b[0m\n' % testing_str
537    self.assertEqual(capture_output.getvalue(), green_highlight_wrap_string)
538
539    # Green text with wrap, but no background.
540    mock_has_colors.return_value = True
541    capture_output = StringIO()
542    sys.stdout = capture_output
543    atest_utils.colorful_print(testing_str, green_no, auto_wrap=True)
544    sys.stdout = sys.__stdout__
545    green_wrap_no_highlight_string = '\x1b[1;32m%s\x1b[0m\n' % testing_str
546    self.assertEqual(capture_output.getvalue(), green_wrap_no_highlight_string)
547
548  def test_is_supported_mainline_module(self):
549    """Test the installed artifacts are supported."""
550    self.assertTrue(atest_utils.is_supported_mainline_module('out/foo.apk'))
551    self.assertTrue(atest_utils.is_supported_mainline_module('out/foo.apks'))
552    self.assertTrue(atest_utils.is_supported_mainline_module('out/foo.apex'))
553    self.assertFalse(atest_utils.is_supported_mainline_module('out/foo.capex'))
554
555  def test_get_test_and_mainline_modules(self):
556    """Test whether the given test reference is a mainline module test."""
557    # regular test.
558    self.assertIsNone(atest_utils.get_test_and_mainline_modules('test_name'))
559    # missing trailing bracket.
560    self.assertIsNone(
561        atest_utils.get_test_and_mainline_modules('test_name[foo.apk+goo.apex')
562    )
563    # valid mainline module syntax
564    self.assertIsNotNone(
565        atest_utils.get_test_and_mainline_modules('test_name[foo.apk]')
566    )
567    self.assertIsNotNone(
568        atest_utils.get_test_and_mainline_modules('test_name[foo.apk+goo.apex]')
569    )
570
571  def test_get_test_info_cache_path(self):
572    """Test method get_test_info_cache_path."""
573    input_file_name = 'mytest_name'
574    cache_root = '/a/b/c'
575    expect_hashed_name = (
576        '%s.cache' % hashlib.md5(str(input_file_name).encode()).hexdigest()
577    )
578    self.assertEqual(
579        os.path.join(cache_root, expect_hashed_name),
580        atest_utils.get_test_info_cache_path(input_file_name, cache_root),
581    )
582
583  def test_get_and_load_cache(self):
584    """Test method update_test_info_cache and load_test_info_cache."""
585    test_reference = 'myTestRefA'
586    test_cache_dir = tempfile.mkdtemp()
587    atest_utils.update_test_info_cache(
588        test_reference, [TEST_INFO_A], test_cache_dir
589    )
590    unittest_utils.assert_equal_testinfo_sets(
591        self,
592        set([TEST_INFO_A]),
593        atest_utils.load_test_info_cache(test_reference, test_cache_dir),
594    )
595
596  @mock.patch('subprocess.check_output')
597  def test_get_modified_files(self, mock_co):
598    """Test method get_modified_files"""
599    mock_co.side_effect = [
600        x.encode('utf-8')
601        for x in ['/a/b/', '\n', 'test_fp1.java\nc/test_fp2.java']
602    ]
603    self.assertEqual(
604        {'/a/b/test_fp1.java', '/a/b/c/test_fp2.java'},
605        atest_utils.get_modified_files(''),
606    )
607    mock_co.side_effect = [
608        x.encode('utf-8') for x in ['/a/b/', 'test_fp4', '/test_fp3.java']
609    ]
610    self.assertEqual(
611        {'/a/b/test_fp4', '/a/b/test_fp3.java'},
612        atest_utils.get_modified_files(''),
613    )
614
615  def test_delimiter(self):
616    """Test method delimiter"""
617    self.assertEqual('\n===\n\n', atest_utils.delimiter('=', 3, 1, 2))
618
619  def test_has_python_module(self):
620    """Test method has_python_module"""
621    self.assertFalse(atest_utils.has_python_module('M_M'))
622    self.assertTrue(atest_utils.has_python_module('os'))
623
624  @mock.patch.object(atest_utils, 'matched_tf_error_log', return_value=True)
625  def test_read_zip_single_text(self, _matched):
626    """Test method extract_zip_text include only one text file."""
627    zip_path = os.path.join(
628        unittest_constants.TEST_DATA_DIR,
629        TEST_ZIP_DATA_DIR,
630        TEST_SINGLE_ZIP_NAME,
631    )
632    expect_content = '\nfile1_line1\nfile1_line2\n'
633    self.assertEqual(expect_content, atest_utils.extract_zip_text(zip_path))
634
635  @mock.patch.object(atest_utils, 'matched_tf_error_log', return_value=True)
636  def test_read_zip_multi_text(self, _matched):
637    """Test method extract_zip_text include multiple text files."""
638    zip_path = os.path.join(
639        unittest_constants.TEST_DATA_DIR, TEST_ZIP_DATA_DIR, TEST_MULTI_ZIP_NAME
640    )
641    expect_content = '\nfile1_line1\nfile1_line2\n\nfile2_line1\nfile2_line2\n'
642    self.assertEqual(expect_content, atest_utils.extract_zip_text(zip_path))
643
644  def test_matched_tf_error_log(self):
645    """Test method extract_zip_text include multiple text files."""
646    matched_content = '05-25 17:37:04 E/XXXXX YYYYY'
647    not_matched_content = '05-25 17:37:04 I/XXXXX YYYYY'
648    # Test matched content
649    self.assertEqual(True, atest_utils.matched_tf_error_log(matched_content))
650    # Test not matched content
651    self.assertEqual(
652        False, atest_utils.matched_tf_error_log(not_matched_content)
653    )
654
655  # pylint: disable=no-member
656  def test_read_test_record_proto(self):
657    """Test method read_test_record."""
658    test_record_file_path = os.path.join(
659        unittest_constants.TEST_DATA_DIR, 'test_record.proto.testonly'
660    )
661    test_record = atest_utils.read_test_record(test_record_file_path)
662    self.assertEqual(
663        test_record.children[0].inline_test_record.test_record_id,
664        'x86 hello_world_test',
665    )
666
667  def test_load_json_safely_file_inexistent(self):
668    """Test method load_json_safely if file does not exist."""
669    json_file_path = Path(unittest_constants.TEST_DATA_DIR).joinpath(
670        'not_exist.json'
671    )
672    self.assertEqual({}, atest_utils.load_json_safely(json_file_path))
673
674  def test_load_json_safely_valid_json_format(self):
675    """Test method load_json_safely if file exists and format is valid."""
676    json_file_path = Path(unittest_constants.TEST_DATA_DIR).joinpath(
677        'module-info.json'
678    )
679    content = atest_utils.load_json_safely(json_file_path)
680    self.assertEqual(
681        'MainModule1', content.get('MainModule1').get('module_name')
682    )
683    self.assertEqual(
684        [], content.get('MainModule2').get('test_mainline_modules')
685    )
686
687  def test_load_json_safely_invalid_json_format(self):
688    """Test method load_json_safely if file exist but content is invalid."""
689    json_file_path = Path(unittest_constants.TEST_DATA_DIR).joinpath(
690        'not-valid-module-info.json'
691    )
692    self.assertEqual({}, atest_utils.load_json_safely(json_file_path))
693
694  @mock.patch('os.getenv')
695  def test_get_manifest_branch(self, mock_env):
696    """Test method get_manifest_branch"""
697    build_top = tempfile.TemporaryDirectory()
698    mock_env.return_value = build_top.name
699    repo_dir = Path(build_top.name).joinpath('.repo')
700    portal_xml = repo_dir.joinpath('manifest.xml')
701    manifest_dir = repo_dir.joinpath('manifests')
702    target_xml = manifest_dir.joinpath('Default.xml')
703    repo_dir.mkdir()
704    manifest_dir.mkdir()
705    content_portal = '<manifest><include name="Default.xml" /></manifest>'
706    content_manifest = """<manifest>
707            <remote name="aosp" fetch=".." review="https://android-review.googlesource.com/" />
708            <default revision="MONSTER-dev" remote="aosp" sync-j="4" />
709        </manifest>"""
710
711    # 1. The manifest.xml(portal) contains 'include' directive: 'Default.xml'.
712    # Search revision in .repo/manifests/Default.xml.
713    with open(portal_xml, 'w') as cache:
714      cache.write(content_portal)
715    with open(target_xml, 'w') as cache:
716      cache.write(content_manifest)
717    self.assertEqual('MONSTER-dev', atest_utils.get_manifest_branch())
718    self.assertEqual('aosp-MONSTER-dev', atest_utils.get_manifest_branch(True))
719    os.remove(target_xml)
720    os.remove(portal_xml)
721
722    # 2. The manifest.xml contains neither 'include' nor 'revision' directive,
723    # keep searching revision in .repo/manifests/default.xml by default.
724    with open(portal_xml, 'w') as cache:
725      cache.write('<manifest></manifest>')
726    default_xml = manifest_dir.joinpath('default.xml')
727    with open(default_xml, 'w') as cache:
728      cache.write(content_manifest)
729    self.assertEqual('MONSTER-dev', atest_utils.get_manifest_branch())
730    os.remove(default_xml)
731    os.remove(portal_xml)
732
733    # 3. revision was directly defined in 'manifest.xml'.
734    with open(portal_xml, 'w') as cache:
735      cache.write(content_manifest)
736    self.assertEqual('MONSTER-dev', atest_utils.get_manifest_branch())
737    os.remove(portal_xml)
738
739    # 4. Return None if the included xml does not exist.
740    with open(portal_xml, 'w') as cache:
741      cache.write(content_portal)
742    self.assertEqual('', atest_utils.get_manifest_branch())
743    os.remove(portal_xml)
744
745  def test_has_wildcard(self):
746    """Test method of has_wildcard"""
747    self.assertFalse(atest_utils.has_wildcard('test1'))
748    self.assertFalse(atest_utils.has_wildcard(['test1']))
749    self.assertTrue(atest_utils.has_wildcard('test1?'))
750    self.assertTrue(atest_utils.has_wildcard(['test1', 'b*', 'a?b*']))
751
752  # pylint: disable=anomalous-backslash-in-string
753  def test_quote(self):
754    """Test method of quote()"""
755    target_str = r'TEST_(F|P)[0-9].*\w$'
756    expected_str = "'TEST_(F|P)[0-9].*\w$'"
757    self.assertEqual(atest_utils.quote(target_str), expected_str)
758    self.assertEqual(atest_utils.quote('TEST_P224'), 'TEST_P224')
759
760  @mock.patch('builtins.input', return_value='')
761  def test_prompt_with_yn_result(self, mock_input):
762    """Test method of prompt_with_yn_result"""
763    msg = 'Do you want to continue?'
764    mock_input.return_value = ''
765    self.assertTrue(atest_utils.prompt_with_yn_result(msg, True))
766    self.assertFalse(atest_utils.prompt_with_yn_result(msg, False))
767    mock_input.return_value = 'y'
768    self.assertTrue(atest_utils.prompt_with_yn_result(msg, True))
769    mock_input.return_value = 'nO'
770    self.assertFalse(atest_utils.prompt_with_yn_result(msg, True))
771
772  def test_get_android_junit_config_filters(self):
773    """Test method of get_android_junit_config_filters"""
774    no_filter_test_config = os.path.join(
775        unittest_constants.TEST_DATA_DIR, 'filter_configs', 'no_filter.cfg'
776    )
777    self.assertEqual(
778        {}, atest_utils.get_android_junit_config_filters(no_filter_test_config)
779    )
780
781    filtered_test_config = os.path.join(
782        unittest_constants.TEST_DATA_DIR, 'filter_configs', 'filter.cfg'
783    )
784    filter_dict = atest_utils.get_android_junit_config_filters(
785        filtered_test_config
786    )
787    include_annotations = filter_dict.get(constants.INCLUDE_ANNOTATION)
788    include_annotations.sort()
789    self.assertEqual(['include1', 'include2'], include_annotations)
790    exclude_annotation = filter_dict.get(constants.EXCLUDE_ANNOTATION)
791    exclude_annotation.sort()
792    self.assertEqual(['exclude1', 'exclude2'], exclude_annotation)
793
794  def test_md5sum_file_existent(self):
795    """Test method of md5sum for an existent file."""
796    with tempfile.NamedTemporaryFile() as tmp_file:
797      with open(tmp_file.name, 'w', encoding='utf-8') as f:
798        f.write('some context')
799      expected_md5 = '6d583707b0149c07cc19a05f5fdc320c'
800
801      actual_md5 = atest_utils.md5sum(tmp_file.name)
802
803      self.assertEqual(actual_md5, expected_md5)
804
805  def test_md5sum_file_inexistent(self):
806    """Test method of md5sum for an inexistent file."""
807    inexistent_file = os.path.join('/somewhere/does/not/exist')
808    expected_md5 = ''
809
810    actual_md5 = atest_utils.md5sum(inexistent_file)
811
812    self.assertEqual(actual_md5, expected_md5)
813
814  def test_check_md5(self):
815    """Test method of check_md5"""
816    file1 = os.path.join(
817        unittest_constants.TEST_DATA_DIR, unittest_constants.JSON_FILE
818    )
819    checksum_file = '/tmp/_tmp_module-info.json'
820    atest_utils.save_md5([file1], '/tmp/_tmp_module-info.json')
821    self.assertTrue(atest_utils.check_md5(checksum_file))
822    os.remove(checksum_file)
823    self.assertFalse(atest_utils.check_md5(checksum_file))
824    self.assertTrue(atest_utils.check_md5(checksum_file, missing_ok=True))
825
826  def test_get_config_parameter(self):
827    """Test method of get_config_parameter"""
828    parameter_config = os.path.join(
829        unittest_constants.TEST_DATA_DIR, 'parameter_config', 'parameter.cfg'
830    )
831    no_parameter_config = os.path.join(
832        unittest_constants.TEST_DATA_DIR, 'parameter_config', 'no_parameter.cfg'
833    )
834
835    # Test parameter empty value
836    self.assertEqual(
837        set(), atest_utils.get_config_parameter(no_parameter_config)
838    )
839
840    # Test parameter empty value
841    self.assertEqual(
842        {'value_1', 'value_2', 'value_3', 'value_4'},
843        atest_utils.get_config_parameter(parameter_config),
844    )
845
846  def test_get_config_device(self):
847    """Test method of get_config_device"""
848    device_config = os.path.join(
849        unittest_constants.TEST_DATA_DIR,
850        'parameter_config',
851        'multiple_device.cfg',
852    )
853    self.assertEqual(
854        {'device_1', 'device_2'}, atest_utils.get_config_device(device_config)
855    )
856
857  def test_get_mainline_param(self):
858    """Test method of get_mainline_param"""
859    mainline_param_config = os.path.join(
860        unittest_constants.TEST_DATA_DIR,
861        'parameter_config',
862        'mainline_param.cfg',
863    )
864    self.assertEqual(
865        {'foo1.apex', 'foo2.apk+foo3.apk'},
866        atest_utils.get_mainline_param(mainline_param_config),
867    )
868    no_mainline_param_config = os.path.join(
869        unittest_constants.TEST_DATA_DIR, 'parameter_config', 'parameter.cfg'
870    )
871    self.assertEqual(
872        set(), atest_utils.get_mainline_param(no_mainline_param_config)
873    )
874
875  def test_get_full_annotation_class_name(self):
876    """Test method of get_full_annotation_class_name."""
877    app_mode_full = 'android.platform.test.annotations.AppModeFull'
878    presubmit = 'android.platform.test.annotations.Presubmit'
879    module_info = {
880        'srcs': [
881            os.path.join(
882                unittest_constants.TEST_DATA_DIR,
883                'annotation_testing',
884                'Annotation.src',
885            )
886        ]
887    }
888    # get annotation class from keyword
889    self.assertEqual(
890        atest_utils.get_full_annotation_class_name(module_info, 'presubmit'),
891        presubmit,
892    )
893    # get annotation class from an accurate fqcn keyword.
894    self.assertEqual(
895        atest_utils.get_full_annotation_class_name(module_info, presubmit),
896        presubmit,
897    )
898    # accept fqcn keyword in lowercase.
899    self.assertEqual(
900        atest_utils.get_full_annotation_class_name(
901            module_info, 'android.platform.test.annotations.presubmit'
902        ),
903        presubmit,
904    )
905    # unable to get annotation class from keyword.
906    self.assertNotEqual(
907        atest_utils.get_full_annotation_class_name(
908            module_info, 'appleModefull'
909        ),
910        app_mode_full,
911    )
912    # do not support partial-correct keyword.
913    self.assertNotEqual(
914        atest_utils.get_full_annotation_class_name(
915            module_info, 'android.platform.test.annotations.pres'
916        ),
917        presubmit,
918    )
919
920  def test_has_mixed_type_filters_one_module_with_one_type_return_false(self):
921    """Test method of has_mixed_type_filters"""
922    filter_1 = test_info.TestFilter('CLASS', frozenset(['METHOD']))
923    test_data_1 = {constants.TI_FILTER: [filter_1]}
924    test_info_1 = test_info.TestInfo(
925        'MODULE', 'RUNNER', set(), test_data_1, 'SUITE', '', set()
926    )
927    self.assertFalse(atest_utils.has_mixed_type_filters([test_info_1]))
928
929  def test_has_mixed_type_filters_one_module_with_mixed_types_return_true(self):
930    """Test method of has_mixed_type_filters"""
931    filter_1 = test_info.TestFilter('CLASS', frozenset(['METHOD']))
932    filter_2 = test_info.TestFilter('CLASS', frozenset(['METHOD*']))
933    test_data_2 = {constants.TI_FILTER: [filter_1, filter_2]}
934    test_info_2 = test_info.TestInfo(
935        'MODULE', 'RUNNER', set(), test_data_2, 'SUITE', '', set()
936    )
937    self.assertTrue(atest_utils.has_mixed_type_filters([test_info_2]))
938
939  def test_has_mixed_type_filters_two_module_with_mixed_types_return_false(
940      self,
941  ):
942    """Test method of has_mixed_type_filters"""
943    filter_1 = test_info.TestFilter('CLASS', frozenset(['METHOD']))
944    test_data_1 = {constants.TI_FILTER: [filter_1]}
945    test_info_1 = test_info.TestInfo(
946        'MODULE', 'RUNNER', set(), test_data_1, 'SUITE', '', set()
947    )
948    filter_3 = test_info.TestFilter('CLASS', frozenset(['METHOD*']))
949    test_data_3 = {constants.TI_FILTER: [filter_3]}
950    test_info_3 = test_info.TestInfo(
951        'MODULE3', 'RUNNER', set(), test_data_3, 'SUITE', '', set()
952    )
953    self.assertFalse(
954        atest_utils.has_mixed_type_filters([test_info_1, test_info_3])
955    )
956
957  def test_get_filter_types(self):
958    """Test method of get_filter_types."""
959    filters = set(['CLASS#METHOD'])
960    expect_types = set([FilterType.REGULAR_FILTER.value])
961    self.assertEqual(atest_utils.get_filter_types(filters), expect_types)
962
963    filters = set(['CLASS#METHOD*'])
964    expect_types = set([FilterType.WILDCARD_FILTER.value])
965    self.assertEqual(atest_utils.get_filter_types(filters), expect_types)
966
967    filters = set(['CLASS#METHOD', 'CLASS#METHOD*'])
968    expect_types = set(
969        [FilterType.WILDCARD_FILTER.value, FilterType.REGULAR_FILTER.value]
970    )
971    self.assertEqual(atest_utils.get_filter_types(filters), expect_types)
972
973    filters = set(['CLASS#METHOD?', 'CLASS#METHOD*'])
974    expect_types = set([FilterType.WILDCARD_FILTER.value])
975    self.assertEqual(atest_utils.get_filter_types(filters), expect_types)
976
977  def test_get_bp_content(self):
978    """Method get_bp_content."""
979    # 1. "manifest" and "instrumentation_for" are defined.
980    content = """android_test    {
981                // comment
982                instrumentation_for: "AmSlam", // comment
983                manifest: "AndroidManifest-test.xml",
984                name: "AmSlamTests",
985        }"""
986    expected_result = {
987        'AmSlamTests': {
988            'target_module': 'AmSlam',
989            'manifest': 'AndroidManifest-test.xml',
990        }
991    }
992    temp_dir = tempfile.TemporaryDirectory()
993    tmpbp = Path(temp_dir.name).joinpath('Android.bp')
994    with open(tmpbp, 'w') as cache:
995      cache.write(content)
996    self.assertEqual(
997        atest_utils.get_bp_content(tmpbp, 'android_test'), expected_result
998    )
999    temp_dir.cleanup()
1000
1001    # 2. Only name is defined, will give default manifest and null
1002    # target_module.
1003    content = """android_app    {
1004                // comment
1005                name: "AmSlam",
1006                srcs: ["src1.java", "src2.java"]
1007        }"""
1008    expected_result = {
1009        'AmSlam': {'target_module': '', 'manifest': 'AndroidManifest.xml'}
1010    }
1011    temp_dir = tempfile.TemporaryDirectory()
1012    tmpbp = Path(temp_dir.name).joinpath('Android.bp')
1013    with open(tmpbp, 'w') as cache:
1014      cache.write(content)
1015    self.assertEqual(
1016        atest_utils.get_bp_content(tmpbp, 'android_app'), expected_result
1017    )
1018    temp_dir.cleanup()
1019
1020    # 3. Not even an Android.bp.
1021    content = """LOCAL_PATH := $(call my-dir)
1022                # comment
1023                include $(call all-subdir-makefiles)
1024                LOCAL_MODULE := atest_foo_test
1025        }"""
1026    temp_dir = tempfile.TemporaryDirectory()
1027    tmpbp = Path(temp_dir.name).joinpath('Android.mk')
1028    with open(tmpbp, 'w') as cache:
1029      cache.write(content)
1030    self.assertEqual(atest_utils.get_bp_content(tmpbp, 'android_app'), {})
1031    temp_dir.cleanup()
1032
1033  def test_get_manifest_info(self):
1034    """test get_manifest_info method."""
1035    # An instrumentation test:
1036    test_xml = os.path.join(
1037        unittest_constants.TEST_DATA_DIR,
1038        'foo/bar/AmSlam/test/AndroidManifest.xml',
1039    )
1040    expected = {
1041        'package': 'com.android.settings.tests.unit',
1042        'target_package': 'c0m.andr0id.settingS',
1043        'persistent': False,
1044    }
1045    self.assertEqual(expected, atest_utils.get_manifest_info(test_xml))
1046
1047    # A target module:
1048    target_xml = os.path.join(
1049        unittest_constants.TEST_DATA_DIR, 'foo/bar/AmSlam/AndroidManifest.xml'
1050    )
1051    expected = {
1052        'package': 'c0m.andr0id.settingS',
1053        'target_package': '',
1054        'persistent': False,
1055    }
1056    self.assertEqual(expected, atest_utils.get_manifest_info(target_xml))
1057
1058
1059class GetTradefedInvocationTimeTest(fake_filesystem_unittest.TestCase):
1060  """Tests of get_tradefed_invocation_time for various conditions."""
1061
1062  def setUp(self):
1063    self.setUpPyfakefs()
1064    self.log_path = '/somewhere/atest/log'
1065
1066  def test_get_tradefed_invocation_time_second_only(self):
1067    """Test the parser can handle second and millisecond properly."""
1068    end_host_log_file = Path(
1069        self.log_path,
1070        'inv_hashed_path',
1071        'inv_hashed_subpath',
1072        'end_host_log_test1.txt',
1073    )
1074    contents = """
1075=============== Consumed Time ==============
1076    x86_64 HelloWorldTests: 1s
1077    x86_64 hallo-welt: 768 ms
1078Total aggregated tests run time: 1s
1079============== Modules Preparation Times ==============
1080    x86_64 HelloWorldTests => prep = 2580 ms || clean = 298 ms
1081    x86_64 hallo-welt => prep = 1736 ms || clean = 243 ms
1082Total preparation time: 4s  ||  Total tear down time: 541 ms
1083=======================================================
1084=============== Summary ===============
1085Total Run time: 6s
10862/2 modules completed
1087Total Tests       : 3
1088PASSED            : 3
1089FAILED            : 0
1090============== End of Results =============="""
1091    self.fs.create_file(end_host_log_file, contents=contents)
1092    test = 1 * 1000 + 768
1093    prep = 2580 + 1736
1094    teardown = 298 + 243
1095    expected_elapsed_time = (test, prep, teardown)
1096
1097    actual_elapsed_time = atest_utils.get_tradefed_invocation_time(
1098        self.log_path
1099    )
1100
1101    self.assertEqual(actual_elapsed_time, expected_elapsed_time)
1102
1103  def test_get_tradefed_invocation_time_from_hours_to_milliseconds(self):
1104    """Test whether the parse can handle from hour to ms properly."""
1105    end_host_log_file = Path(
1106        self.log_path,
1107        'inv_hashed_path',
1108        'inv_hashed_subpath',
1109        'end_host_log_test2.txt',
1110    )
1111    contents = """
1112=============== Consumed Time ==============
1113    x86_64 HelloWorldTests: 27m 19s
1114    x86_64 hallo-welt: 3m 2s
1115Total aggregated tests run time: 31m
1116============== Modules Preparation Times ==============
1117    x86_64 HelloWorldTests => prep = 2580 ms || clean = 1298 ms
1118    x86_64 hallo-welt => prep = 1736 ms || clean = 1243 ms
1119Total preparation time: 1h 24m 17s ||  Total tear down time: 3s
1120=======================================================
1121=============== Summary ===============
1122Total Run time: 2h 5m 17s
11232/2 modules completed
1124Total Tests       : 3
1125PASSED            : 3
1126FAILED            : 0
1127============== End of Results =============="""
1128    self.fs.create_file(end_host_log_file, contents=contents)
1129    test = (27 * 60 + 19) * 1000 + (3 * 60 + 2) * 1000
1130    prep = 2580 + 1736
1131    teardown = 1298 + 1243
1132    expected_elapsed_time = (test, prep, teardown)
1133
1134    actual_elapsed_time = atest_utils.get_tradefed_invocation_time(
1135        self.log_path
1136    )
1137
1138    self.assertEqual(actual_elapsed_time, expected_elapsed_time)
1139
1140  def test_get_tradefed_invocation_time_null_result(self):
1141    """Test whether the parser returns null tuple when no keywords found."""
1142    end_host_log_file = Path(
1143        self.log_path,
1144        'inv_hashed_path',
1145        'inv_hashed_subpath',
1146        'end_host_log_test4.txt',
1147    )
1148    contents = 'some\ncontext'
1149    self.fs.create_file(end_host_log_file, contents=contents)
1150    expected_elapsed_time = (0, 0, 0)
1151
1152    actual_elapsed_time = atest_utils.get_tradefed_invocation_time(
1153        self.log_path
1154    )
1155
1156    self.assertEqual(actual_elapsed_time, expected_elapsed_time)
1157
1158
1159if __name__ == '__main__':
1160  unittest.main()
1161