1#!/usr/bin/env python3 2# 3# Copyright 2018, The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17"""Unittests for atest_utils.""" 18 19# pylint: disable=invalid-name 20 21import hashlib 22from io import StringIO 23import os 24from pathlib import Path 25import subprocess 26import sys 27import tempfile 28import unittest 29from unittest import mock 30 31from atest import arg_parser 32from atest import atest_error 33from atest import atest_utils 34from atest import constants 35from atest import unittest_constants 36from atest import unittest_utils 37from atest.atest_enum import FilterType 38from atest.test_finders import test_info 39from pyfakefs import fake_filesystem_unittest 40 41TEST_MODULE_NAME_A = 'ModuleNameA' 42TEST_RUNNER_A = 'FakeTestRunnerA' 43TEST_BUILD_TARGET_A = set(['bt1', 'bt2']) 44TEST_DATA_A = {'test_data_a_1': 'a1', 'test_data_a_2': 'a2'} 45TEST_SUITE_A = 'FakeSuiteA' 46TEST_MODULE_CLASS_A = 'FAKE_MODULE_CLASS_A' 47TEST_INSTALL_LOC_A = set(['host', 'device']) 48TEST_FINDER_A = 'MODULE' 49TEST_INFO_A = test_info.TestInfo( 50 TEST_MODULE_NAME_A, 51 TEST_RUNNER_A, 52 TEST_BUILD_TARGET_A, 53 TEST_DATA_A, 54 TEST_SUITE_A, 55 TEST_MODULE_CLASS_A, 56 TEST_INSTALL_LOC_A, 57) 58TEST_INFO_A.test_finder = TEST_FINDER_A 59TEST_ZIP_DATA_DIR = 'zip_files' 60TEST_SINGLE_ZIP_NAME = 'single_file.zip' 61TEST_MULTI_ZIP_NAME = 'multi_file.zip' 62 63REPO_INFO_OUTPUT = """Manifest branch: test_branch 64Manifest merge branch: refs/heads/test_branch 65Manifest groups: all,-notdefault 66---------------------------- 67""" 68 69 70class StreamIoOutputTest(unittest.TestCase): 71 """Class that tests the _stream_io_output function.""" 72 73 def test_stream_io_output_no_max_lines_no_clear_line_code(self): 74 """Test when max_lines is None, no clear line code is written to the stream.""" 75 io_input = StringIO() 76 io_input.write(f'1\n' * 10) 77 io_input.seek(0) 78 io_output = StringIO() 79 80 atest_utils.stream_io_output( 81 io_input, max_lines=None, io_output=io_output, is_io_output_atty=True 82 ) 83 84 self.assertNotIn( 85 atest_utils._BASH_CLEAR_PREVIOUS_LINE_CODE, io_output.getvalue() 86 ) 87 88 @mock.patch.object(atest_utils, 'get_terminal_size', return_value=(5, -1)) 89 def test_stream_io_output_wrap_long_lines(self, _): 90 """Test when max_lines is set, long lines will be wrapped.""" 91 io_input = StringIO() 92 io_input.write(f'1' * 10) 93 io_input.seek(0) 94 io_output = StringIO() 95 96 atest_utils.stream_io_output( 97 io_input, max_lines=10, io_output=io_output, is_io_output_atty=True 98 ) 99 100 self.assertIn('11111\n11111', io_output.getvalue()) 101 102 @mock.patch.object(atest_utils, 'get_terminal_size', return_value=(5, -1)) 103 def test_stream_io_output_clear_lines_over_max_lines(self, _): 104 """Test when line exceeds max_lines, the previous lines are cleared.""" 105 io_input = StringIO() 106 io_input.write('1\n2\n3\n') 107 io_input.seek(0) 108 io_output = StringIO() 109 110 atest_utils.stream_io_output( 111 io_input, max_lines=2, io_output=io_output, is_io_output_atty=True 112 ) 113 114 self.assertIn( 115 '2\n3\n', 116 io_output.getvalue(), 117 ) 118 self.assertNotIn( 119 '1\n2\n3\n', 120 io_output.getvalue(), 121 ) 122 123 @mock.patch.object(atest_utils, 'get_terminal_size', return_value=(5, -1)) 124 def test_stream_io_output_no_clear_lines_under_max_lines(self, _): 125 """Test when line is under max_lines, the previous lines are not cleared.""" 126 io_input = StringIO() 127 io_input.write('1\n2\n3\n') 128 io_input.seek(0) 129 io_output = StringIO() 130 131 atest_utils.stream_io_output( 132 io_input, max_lines=4, io_output=io_output, is_io_output_atty=True 133 ) 134 135 self.assertIn( 136 '1\n2\n3\n', 137 io_output.getvalue(), 138 ) 139 140 @mock.patch.object(atest_utils, 'get_terminal_size', return_value=(5, -1)) 141 def test_stream_io_output_no_lines_written_no_lines_cleared(self, _): 142 """Test when nothing is written, no lines are cleared.""" 143 io_input = StringIO() 144 io_output = StringIO() 145 146 atest_utils.stream_io_output( 147 io_input, max_lines=2, io_output=io_output, is_io_output_atty=True 148 ) 149 150 self.assertNotIn( 151 atest_utils._BASH_CLEAR_PREVIOUS_LINE_CODE, 152 io_output.getvalue(), 153 ) 154 155 @mock.patch.object(atest_utils, 'get_terminal_size', return_value=(5, -1)) 156 def test_stream_io_output_replace_tab_with_spaces(self, _): 157 """Test when line exceeds max_lines, the previous lines are cleared.""" 158 io_input = StringIO() 159 io_input.write('1\t2') 160 io_input.seek(0) 161 io_output = StringIO() 162 163 atest_utils.stream_io_output( 164 io_input, max_lines=2, io_output=io_output, is_io_output_atty=True 165 ) 166 167 self.assertNotIn( 168 '\t', 169 io_output.getvalue(), 170 ) 171 172 173class ConcatenatePathTest(unittest.TestCase): 174 """Class that tests path concatenation.""" 175 176 @classmethod 177 def setUpClass(cls): 178 """Mock the environment variables for the entire test class""" 179 cls.build_top = '/src/build_top' 180 cls.prod_out = '/src/build_top/product_out' 181 cls.host_out = '/src/build_top/host_out' 182 cls.target_out_cases = '/src/build_top/product_out/testcases' 183 cls.host_out_cases = '/src/build_top/host_out/testcases' 184 cls.target_product = 'test_target_product' 185 cls.build_variant = 'test_build_variant' 186 cls.mock_getenv = mock.patch.dict( 187 os.environ, 188 { 189 'ANDROID_BUILD_TOP': cls.build_top, 190 'ANDROID_PRODUCT_OUT': cls.prod_out, 191 'ANDROID_TARGET_OUT_TESTCASES': cls.target_out_cases, 192 'ANDROID_HOST_OUT': cls.host_out, 193 'ANDROID_HOST_OUT_TESTCASES': cls.host_out_cases, 194 'TARGET_PRODUCT': cls.target_product, 195 'TARGET_BUILD_VARIANT': cls.build_variant, 196 }, 197 ) 198 cls.mock_getenv.start() 199 200 @classmethod 201 def tearDownClass(cls): 202 """Clean up the mocks after the test class finishes""" 203 cls.mock_getenv.stop() 204 205 def test_get_vars(self): 206 """Test the values of AndroidVariables are expected""" 207 variables = atest_utils.AndroidVariables() 208 209 self.assertEqual(variables.build_top, self.build_top) 210 self.assertEqual(variables.product_out, self.prod_out) 211 self.assertEqual(variables.target_out_cases, self.target_out_cases) 212 self.assertEqual(variables.host_out, self.host_out) 213 self.assertEqual(variables.host_out_cases, self.host_out_cases) 214 self.assertEqual(variables.target_product, self.target_product) 215 self.assertEqual(variables.build_variant, self.build_variant) 216 217 def test_atest_utils_get_build_top(self): 218 """Test concatenating strings with get_build_top().""" 219 expected_path = Path(self.build_top, 'path/to/project') 220 221 return_path = atest_utils.get_build_top('path/to/project') 222 223 self.assertEqual(expected_path, return_path) 224 225 def test_atest_utils_get_product_out(self): 226 """Test concatenating strings with get_product_out().""" 227 expected_path = Path(self.prod_out, 'module-info.json') 228 229 return_path = atest_utils.get_product_out('module-info.json') 230 231 self.assertEqual(expected_path, return_path) 232 233 def test_atest_utils_get_host_out(self): 234 """Test concatenating strings with get_host_out().""" 235 expected_path = Path(self.host_out, 'bin/adb') 236 237 return_path = atest_utils.get_host_out('bin', 'adb') 238 239 self.assertEqual(expected_path, return_path) 240 241 242class GetBuildOutDirTests(unittest.TestCase): 243 """Test get_build_out_dir() for various conditions.""" 244 245 def setUp(self) -> None: 246 self.abs_OUT_DIR = '/somewhere/out' 247 self.rel_OUT_DIR = 'somewhere/out' 248 self.abs_OUT_DIR_COMMON_BASE = '/somewhere/common_out' 249 self.rel_OUT_DIR_COMMON_BASE = 'somewhere/common_out' 250 251 def test_get_build_abs_out_dir(self): 252 """Test when OUT_DIR is an absolute path.""" 253 with mock.patch.dict( 254 'os.environ', 255 { 256 constants.ANDROID_BUILD_TOP: '/src/build/top', 257 'OUT_DIR': self.abs_OUT_DIR, 258 }, 259 ): 260 expected_out_dir = Path(self.abs_OUT_DIR) 261 262 returned_out_dir = atest_utils.get_build_out_dir() 263 264 self.assertEqual(expected_out_dir, returned_out_dir) 265 266 def test_get_build_rel_out_dir(self): 267 """Test when OUT_DIR is a relative path.""" 268 with mock.patch.dict( 269 'os.environ', 270 { 271 constants.ANDROID_BUILD_TOP: '/src/build/top', 272 'OUT_DIR': self.rel_OUT_DIR, 273 }, 274 ): 275 expected_out_dir = atest_utils.get_build_top(self.rel_OUT_DIR) 276 277 returned_out_dir = atest_utils.get_build_out_dir() 278 279 self.assertEqual(expected_out_dir, returned_out_dir) 280 281 def test_get_build_abs_out_dir_common_base(self): 282 """Test whe OUT_DIR_COMMON_BASE is an absolute path.""" 283 build_top_path = '/src/build/top' 284 branch_name = Path(build_top_path).name 285 with mock.patch.dict( 286 'os.environ', 287 { 288 constants.ANDROID_BUILD_TOP: build_top_path, 289 'OUT_DIR_COMMON_BASE': self.abs_OUT_DIR_COMMON_BASE, 290 }, 291 ): 292 expected_out_dir = Path(self.abs_OUT_DIR_COMMON_BASE, branch_name) 293 294 returned_out_dir = atest_utils.get_build_out_dir() 295 296 self.assertEqual(expected_out_dir, returned_out_dir) 297 298 def test_get_build_rel_out_dir_common_base(self): 299 """Test whe OUT_DIR_COMMON_BASE is a relative path.""" 300 build_top_path = '/src/build/top' 301 branch_name = Path(build_top_path).name 302 with mock.patch.dict( 303 'os.environ', 304 { 305 constants.ANDROID_BUILD_TOP: build_top_path, 306 'OUT_DIR_COMMON_BASE': self.rel_OUT_DIR_COMMON_BASE, 307 }, 308 ): 309 expected_out_dir = Path( 310 build_top_path, self.rel_OUT_DIR_COMMON_BASE, branch_name 311 ) 312 313 returned_out_dir = atest_utils.get_build_out_dir() 314 315 self.assertEqual(expected_out_dir, returned_out_dir) 316 317 def test_get_build_out_dir(self): 318 """Test when OUT_DIR and OUT_DIR_COMMON_BASE are null.""" 319 with mock.patch.dict( 320 'os.environ', {constants.ANDROID_BUILD_TOP: '/src/build/top'} 321 ): 322 expected_out_dir = atest_utils.get_build_top('out') 323 324 returned_out_dir = atest_utils.get_build_out_dir() 325 326 self.assertEqual(expected_out_dir, returned_out_dir) 327 328 329# pylint: disable=protected-access 330# pylint: disable=too-many-public-methods 331class AtestUtilsUnittests(unittest.TestCase): 332 """Unit tests for atest_utils.py""" 333 334 def test_capture_fail_section_has_fail_section(self): 335 """Test capture_fail_section when has fail section.""" 336 test_list = [ 337 'AAAAAA', 338 'FAILED: Error1', 339 '^\n', 340 'Error2\n', 341 '[ 6% 191/2997] BBBBBB\n', 342 'CCCCC', 343 '[ 20% 322/2997] DDDDDD\n', 344 'EEEEE', 345 ] 346 want_list = ['FAILED: Error1', '^\n', 'Error2\n'] 347 self.assertEqual(want_list, atest_utils._capture_fail_section(test_list)) 348 349 def test_capture_fail_section_no_fail_section(self): 350 """Test capture_fail_section when no fail section.""" 351 test_list = ['[ 6% 191/2997] XXXXX', 'YYYYY: ZZZZZ'] 352 want_list = [] 353 self.assertEqual(want_list, atest_utils._capture_fail_section(test_list)) 354 355 def test_is_test_mapping_none_test_mapping_args(self): 356 """Test method is_test_mapping.""" 357 non_tm_args = ['--host-unit-test-only'] 358 359 for argument in non_tm_args: 360 args = arg_parser.create_atest_arg_parser().parse_args([argument]) 361 self.assertFalse( 362 atest_utils.is_test_mapping(args), 363 'Option %s indicates NOT a test_mapping!' % argument, 364 ) 365 366 def test_is_test_mapping_test_mapping_args(self): 367 """Test method is_test_mapping.""" 368 tm_args = ['--test-mapping', '--include-subdirs'] 369 370 for argument in tm_args: 371 args = arg_parser.create_atest_arg_parser().parse_args([argument]) 372 self.assertTrue( 373 atest_utils.is_test_mapping(args), 374 'Option %s indicates a test_mapping!' % argument, 375 ) 376 377 def test_is_test_mapping_implicit_test_mapping(self): 378 """Test method is_test_mapping.""" 379 args = arg_parser.create_atest_arg_parser().parse_args( 380 ['--test', '--build', ':postsubmit'] 381 ) 382 self.assertTrue( 383 atest_utils.is_test_mapping(args), 384 'Option %s indicates a test_mapping!' % args, 385 ) 386 387 def test_is_test_mapping_with_testname(self): 388 """Test method is_test_mapping.""" 389 irrelevant_args = ['--test', ':postsubmit', 'testname'] 390 391 args = arg_parser.create_atest_arg_parser().parse_args(irrelevant_args) 392 self.assertFalse( 393 atest_utils.is_test_mapping(args), 394 'Option %s indicates a test_mapping!' % args, 395 ) 396 397 def test_is_test_mapping_false(self): 398 """Test method is_test_mapping.""" 399 args = arg_parser.create_atest_arg_parser().parse_args( 400 ['--test', '--build', 'hello_atest'] 401 ) 402 403 self.assertFalse(atest_utils.is_test_mapping(args)) 404 405 def test_has_colors(self): 406 """Test method _has_colors.""" 407 # stream is file I/O 408 stream = open('/tmp/test_has_colors.txt', 'wb') 409 self.assertFalse(atest_utils._has_colors(stream)) 410 stream.close() 411 412 # stream is not a tty(terminal). 413 stream = mock.Mock() 414 stream.isatty.return_value = False 415 self.assertFalse(atest_utils._has_colors(stream)) 416 417 # stream is a tty(terminal). 418 stream = mock.Mock() 419 stream.isatty.return_value = True 420 self.assertTrue(atest_utils._has_colors(stream)) 421 422 @mock.patch('atest.atest_utils._has_colors') 423 def test_colorize(self, mock_has_colors): 424 """Test method colorize.""" 425 original_str = 'test string' 426 green_no = 2 427 428 # _has_colors() return False. 429 mock_has_colors.return_value = False 430 converted_str = atest_utils.colorize( 431 original_str, green_no, bp_color=constants.RED 432 ) 433 self.assertEqual(original_str, converted_str) 434 435 # Green text with red background. 436 mock_has_colors.return_value = True 437 converted_str = atest_utils.colorize( 438 original_str, green_no, bp_color=constants.RED 439 ) 440 green_highlight_string = '\x1b[1;32;41m%s\x1b[0m' % original_str 441 self.assertEqual(green_highlight_string, converted_str) 442 443 # Green text, no background. 444 mock_has_colors.return_value = True 445 converted_str = atest_utils.colorize(original_str, green_no) 446 green_no_highlight_string = '\x1b[1;32m%s\x1b[0m' % original_str 447 self.assertEqual(green_no_highlight_string, converted_str) 448 449 @mock.patch('atest.atest_utils.colorful_print') 450 @mock.patch('logging.error') 451 def test_print_and_log_error_no_format_prints_and_logs( 452 self, mocked_print, locked_error_logging 453 ): 454 atest_utils.print_and_log_error('no format') 455 456 mocked_print.assert_called_once() 457 locked_error_logging.assert_called_once() 458 459 @mock.patch('atest.atest_utils.colorful_print') 460 def test_print_and_log_error_single_non_string_prints(self, mocked_print): 461 atest_utils.print_and_log_error(123) 462 463 mocked_print.assert_called_once() 464 465 @mock.patch('atest.atest_utils.colorful_print') 466 def test_print_and_log_error_with_format_prints(self, mocked_print): 467 atest_utils.print_and_log_error('1+1=%s', 2) 468 469 mocked_print.assert_called_once() 470 471 @mock.patch('atest.atest_utils.colorful_print') 472 def test_print_and_log_error_bad_value_no_throw_no_print(self, mocked_print): 473 atest_utils.print_and_log_error('bad format %', 'format arg') 474 475 mocked_print.assert_not_called() 476 477 @mock.patch('atest.atest_utils.colorful_print') 478 def test_print_and_log_error_missing_format_arg_no_print(self, mocked_print): 479 atest_utils.print_and_log_error('bad format %s %s', 'format arg') 480 481 mocked_print.assert_not_called() 482 483 @mock.patch('atest.atest_utils.colorful_print') 484 def test_print_and_log_error_extra_format_arg_no_print(self, mocked_print): 485 atest_utils.print_and_log_error( 486 'bad format %s', 'format arg1', 'format arg2' 487 ) 488 489 mocked_print.assert_not_called() 490 491 @mock.patch('atest.atest_utils._has_colors') 492 def test_colorful_print(self, mock_has_colors): 493 """Test method colorful_print.""" 494 testing_str = 'color_print_test' 495 green_no = 2 496 497 # _has_colors() return False. 498 mock_has_colors.return_value = False 499 capture_output = StringIO() 500 sys.stdout = capture_output 501 atest_utils.colorful_print( 502 testing_str, green_no, bp_color=constants.RED, auto_wrap=False 503 ) 504 sys.stdout = sys.__stdout__ 505 uncolored_string = testing_str 506 self.assertEqual(capture_output.getvalue(), uncolored_string) 507 508 # Green text with red background, but no wrap. 509 mock_has_colors.return_value = True 510 capture_output = StringIO() 511 sys.stdout = capture_output 512 atest_utils.colorful_print( 513 testing_str, green_no, bp_color=constants.RED, auto_wrap=False 514 ) 515 sys.stdout = sys.__stdout__ 516 green_highlight_no_wrap_string = '\x1b[1;32;41m%s\x1b[0m' % testing_str 517 self.assertEqual(capture_output.getvalue(), green_highlight_no_wrap_string) 518 519 # Green text, no background, no wrap. 520 mock_has_colors.return_value = True 521 capture_output = StringIO() 522 sys.stdout = capture_output 523 atest_utils.colorful_print(testing_str, green_no, auto_wrap=False) 524 sys.stdout = sys.__stdout__ 525 green_no_high_no_wrap_string = '\x1b[1;32m%s\x1b[0m' % testing_str 526 self.assertEqual(capture_output.getvalue(), green_no_high_no_wrap_string) 527 528 # Green text with red background and wrap. 529 mock_has_colors.return_value = True 530 capture_output = StringIO() 531 sys.stdout = capture_output 532 atest_utils.colorful_print( 533 testing_str, green_no, bp_color=constants.RED, auto_wrap=True 534 ) 535 sys.stdout = sys.__stdout__ 536 green_highlight_wrap_string = '\x1b[1;32;41m%s\x1b[0m\n' % testing_str 537 self.assertEqual(capture_output.getvalue(), green_highlight_wrap_string) 538 539 # Green text with wrap, but no background. 540 mock_has_colors.return_value = True 541 capture_output = StringIO() 542 sys.stdout = capture_output 543 atest_utils.colorful_print(testing_str, green_no, auto_wrap=True) 544 sys.stdout = sys.__stdout__ 545 green_wrap_no_highlight_string = '\x1b[1;32m%s\x1b[0m\n' % testing_str 546 self.assertEqual(capture_output.getvalue(), green_wrap_no_highlight_string) 547 548 def test_is_supported_mainline_module(self): 549 """Test the installed artifacts are supported.""" 550 self.assertTrue(atest_utils.is_supported_mainline_module('out/foo.apk')) 551 self.assertTrue(atest_utils.is_supported_mainline_module('out/foo.apks')) 552 self.assertTrue(atest_utils.is_supported_mainline_module('out/foo.apex')) 553 self.assertFalse(atest_utils.is_supported_mainline_module('out/foo.capex')) 554 555 def test_get_test_and_mainline_modules(self): 556 """Test whether the given test reference is a mainline module test.""" 557 # regular test. 558 self.assertIsNone(atest_utils.get_test_and_mainline_modules('test_name')) 559 # missing trailing bracket. 560 self.assertIsNone( 561 atest_utils.get_test_and_mainline_modules('test_name[foo.apk+goo.apex') 562 ) 563 # valid mainline module syntax 564 self.assertIsNotNone( 565 atest_utils.get_test_and_mainline_modules('test_name[foo.apk]') 566 ) 567 self.assertIsNotNone( 568 atest_utils.get_test_and_mainline_modules('test_name[foo.apk+goo.apex]') 569 ) 570 571 def test_get_test_info_cache_path(self): 572 """Test method get_test_info_cache_path.""" 573 input_file_name = 'mytest_name' 574 cache_root = '/a/b/c' 575 expect_hashed_name = ( 576 '%s.cache' % hashlib.md5(str(input_file_name).encode()).hexdigest() 577 ) 578 self.assertEqual( 579 os.path.join(cache_root, expect_hashed_name), 580 atest_utils.get_test_info_cache_path(input_file_name, cache_root), 581 ) 582 583 def test_get_and_load_cache(self): 584 """Test method update_test_info_cache and load_test_info_cache.""" 585 test_reference = 'myTestRefA' 586 test_cache_dir = tempfile.mkdtemp() 587 atest_utils.update_test_info_cache( 588 test_reference, [TEST_INFO_A], test_cache_dir 589 ) 590 unittest_utils.assert_equal_testinfo_sets( 591 self, 592 set([TEST_INFO_A]), 593 atest_utils.load_test_info_cache(test_reference, test_cache_dir), 594 ) 595 596 @mock.patch('subprocess.check_output') 597 def test_get_modified_files(self, mock_co): 598 """Test method get_modified_files""" 599 mock_co.side_effect = [ 600 x.encode('utf-8') 601 for x in ['/a/b/', '\n', 'test_fp1.java\nc/test_fp2.java'] 602 ] 603 self.assertEqual( 604 {'/a/b/test_fp1.java', '/a/b/c/test_fp2.java'}, 605 atest_utils.get_modified_files(''), 606 ) 607 mock_co.side_effect = [ 608 x.encode('utf-8') for x in ['/a/b/', 'test_fp4', '/test_fp3.java'] 609 ] 610 self.assertEqual( 611 {'/a/b/test_fp4', '/a/b/test_fp3.java'}, 612 atest_utils.get_modified_files(''), 613 ) 614 615 def test_delimiter(self): 616 """Test method delimiter""" 617 self.assertEqual('\n===\n\n', atest_utils.delimiter('=', 3, 1, 2)) 618 619 def test_has_python_module(self): 620 """Test method has_python_module""" 621 self.assertFalse(atest_utils.has_python_module('M_M')) 622 self.assertTrue(atest_utils.has_python_module('os')) 623 624 @mock.patch.object(atest_utils, 'matched_tf_error_log', return_value=True) 625 def test_read_zip_single_text(self, _matched): 626 """Test method extract_zip_text include only one text file.""" 627 zip_path = os.path.join( 628 unittest_constants.TEST_DATA_DIR, 629 TEST_ZIP_DATA_DIR, 630 TEST_SINGLE_ZIP_NAME, 631 ) 632 expect_content = '\nfile1_line1\nfile1_line2\n' 633 self.assertEqual(expect_content, atest_utils.extract_zip_text(zip_path)) 634 635 @mock.patch.object(atest_utils, 'matched_tf_error_log', return_value=True) 636 def test_read_zip_multi_text(self, _matched): 637 """Test method extract_zip_text include multiple text files.""" 638 zip_path = os.path.join( 639 unittest_constants.TEST_DATA_DIR, TEST_ZIP_DATA_DIR, TEST_MULTI_ZIP_NAME 640 ) 641 expect_content = '\nfile1_line1\nfile1_line2\n\nfile2_line1\nfile2_line2\n' 642 self.assertEqual(expect_content, atest_utils.extract_zip_text(zip_path)) 643 644 def test_matched_tf_error_log(self): 645 """Test method extract_zip_text include multiple text files.""" 646 matched_content = '05-25 17:37:04 E/XXXXX YYYYY' 647 not_matched_content = '05-25 17:37:04 I/XXXXX YYYYY' 648 # Test matched content 649 self.assertEqual(True, atest_utils.matched_tf_error_log(matched_content)) 650 # Test not matched content 651 self.assertEqual( 652 False, atest_utils.matched_tf_error_log(not_matched_content) 653 ) 654 655 # pylint: disable=no-member 656 def test_read_test_record_proto(self): 657 """Test method read_test_record.""" 658 test_record_file_path = os.path.join( 659 unittest_constants.TEST_DATA_DIR, 'test_record.proto.testonly' 660 ) 661 test_record = atest_utils.read_test_record(test_record_file_path) 662 self.assertEqual( 663 test_record.children[0].inline_test_record.test_record_id, 664 'x86 hello_world_test', 665 ) 666 667 def test_load_json_safely_file_inexistent(self): 668 """Test method load_json_safely if file does not exist.""" 669 json_file_path = Path(unittest_constants.TEST_DATA_DIR).joinpath( 670 'not_exist.json' 671 ) 672 self.assertEqual({}, atest_utils.load_json_safely(json_file_path)) 673 674 def test_load_json_safely_valid_json_format(self): 675 """Test method load_json_safely if file exists and format is valid.""" 676 json_file_path = Path(unittest_constants.TEST_DATA_DIR).joinpath( 677 'module-info.json' 678 ) 679 content = atest_utils.load_json_safely(json_file_path) 680 self.assertEqual( 681 'MainModule1', content.get('MainModule1').get('module_name') 682 ) 683 self.assertEqual( 684 [], content.get('MainModule2').get('test_mainline_modules') 685 ) 686 687 def test_load_json_safely_invalid_json_format(self): 688 """Test method load_json_safely if file exist but content is invalid.""" 689 json_file_path = Path(unittest_constants.TEST_DATA_DIR).joinpath( 690 'not-valid-module-info.json' 691 ) 692 self.assertEqual({}, atest_utils.load_json_safely(json_file_path)) 693 694 @mock.patch('os.getenv') 695 def test_get_manifest_branch(self, mock_env): 696 """Test method get_manifest_branch""" 697 build_top = tempfile.TemporaryDirectory() 698 mock_env.return_value = build_top.name 699 repo_dir = Path(build_top.name).joinpath('.repo') 700 portal_xml = repo_dir.joinpath('manifest.xml') 701 manifest_dir = repo_dir.joinpath('manifests') 702 target_xml = manifest_dir.joinpath('Default.xml') 703 repo_dir.mkdir() 704 manifest_dir.mkdir() 705 content_portal = '<manifest><include name="Default.xml" /></manifest>' 706 content_manifest = """<manifest> 707 <remote name="aosp" fetch=".." review="https://android-review.googlesource.com/" /> 708 <default revision="MONSTER-dev" remote="aosp" sync-j="4" /> 709 </manifest>""" 710 711 # 1. The manifest.xml(portal) contains 'include' directive: 'Default.xml'. 712 # Search revision in .repo/manifests/Default.xml. 713 with open(portal_xml, 'w') as cache: 714 cache.write(content_portal) 715 with open(target_xml, 'w') as cache: 716 cache.write(content_manifest) 717 self.assertEqual('MONSTER-dev', atest_utils.get_manifest_branch()) 718 self.assertEqual('aosp-MONSTER-dev', atest_utils.get_manifest_branch(True)) 719 os.remove(target_xml) 720 os.remove(portal_xml) 721 722 # 2. The manifest.xml contains neither 'include' nor 'revision' directive, 723 # keep searching revision in .repo/manifests/default.xml by default. 724 with open(portal_xml, 'w') as cache: 725 cache.write('<manifest></manifest>') 726 default_xml = manifest_dir.joinpath('default.xml') 727 with open(default_xml, 'w') as cache: 728 cache.write(content_manifest) 729 self.assertEqual('MONSTER-dev', atest_utils.get_manifest_branch()) 730 os.remove(default_xml) 731 os.remove(portal_xml) 732 733 # 3. revision was directly defined in 'manifest.xml'. 734 with open(portal_xml, 'w') as cache: 735 cache.write(content_manifest) 736 self.assertEqual('MONSTER-dev', atest_utils.get_manifest_branch()) 737 os.remove(portal_xml) 738 739 # 4. Return None if the included xml does not exist. 740 with open(portal_xml, 'w') as cache: 741 cache.write(content_portal) 742 self.assertEqual('', atest_utils.get_manifest_branch()) 743 os.remove(portal_xml) 744 745 def test_has_wildcard(self): 746 """Test method of has_wildcard""" 747 self.assertFalse(atest_utils.has_wildcard('test1')) 748 self.assertFalse(atest_utils.has_wildcard(['test1'])) 749 self.assertTrue(atest_utils.has_wildcard('test1?')) 750 self.assertTrue(atest_utils.has_wildcard(['test1', 'b*', 'a?b*'])) 751 752 # pylint: disable=anomalous-backslash-in-string 753 def test_quote(self): 754 """Test method of quote()""" 755 target_str = r'TEST_(F|P)[0-9].*\w$' 756 expected_str = "'TEST_(F|P)[0-9].*\w$'" 757 self.assertEqual(atest_utils.quote(target_str), expected_str) 758 self.assertEqual(atest_utils.quote('TEST_P224'), 'TEST_P224') 759 760 @mock.patch('builtins.input', return_value='') 761 def test_prompt_with_yn_result(self, mock_input): 762 """Test method of prompt_with_yn_result""" 763 msg = 'Do you want to continue?' 764 mock_input.return_value = '' 765 self.assertTrue(atest_utils.prompt_with_yn_result(msg, True)) 766 self.assertFalse(atest_utils.prompt_with_yn_result(msg, False)) 767 mock_input.return_value = 'y' 768 self.assertTrue(atest_utils.prompt_with_yn_result(msg, True)) 769 mock_input.return_value = 'nO' 770 self.assertFalse(atest_utils.prompt_with_yn_result(msg, True)) 771 772 def test_get_android_junit_config_filters(self): 773 """Test method of get_android_junit_config_filters""" 774 no_filter_test_config = os.path.join( 775 unittest_constants.TEST_DATA_DIR, 'filter_configs', 'no_filter.cfg' 776 ) 777 self.assertEqual( 778 {}, atest_utils.get_android_junit_config_filters(no_filter_test_config) 779 ) 780 781 filtered_test_config = os.path.join( 782 unittest_constants.TEST_DATA_DIR, 'filter_configs', 'filter.cfg' 783 ) 784 filter_dict = atest_utils.get_android_junit_config_filters( 785 filtered_test_config 786 ) 787 include_annotations = filter_dict.get(constants.INCLUDE_ANNOTATION) 788 include_annotations.sort() 789 self.assertEqual(['include1', 'include2'], include_annotations) 790 exclude_annotation = filter_dict.get(constants.EXCLUDE_ANNOTATION) 791 exclude_annotation.sort() 792 self.assertEqual(['exclude1', 'exclude2'], exclude_annotation) 793 794 def test_md5sum_file_existent(self): 795 """Test method of md5sum for an existent file.""" 796 with tempfile.NamedTemporaryFile() as tmp_file: 797 with open(tmp_file.name, 'w', encoding='utf-8') as f: 798 f.write('some context') 799 expected_md5 = '6d583707b0149c07cc19a05f5fdc320c' 800 801 actual_md5 = atest_utils.md5sum(tmp_file.name) 802 803 self.assertEqual(actual_md5, expected_md5) 804 805 def test_md5sum_file_inexistent(self): 806 """Test method of md5sum for an inexistent file.""" 807 inexistent_file = os.path.join('/somewhere/does/not/exist') 808 expected_md5 = '' 809 810 actual_md5 = atest_utils.md5sum(inexistent_file) 811 812 self.assertEqual(actual_md5, expected_md5) 813 814 def test_check_md5(self): 815 """Test method of check_md5""" 816 file1 = os.path.join( 817 unittest_constants.TEST_DATA_DIR, unittest_constants.JSON_FILE 818 ) 819 checksum_file = '/tmp/_tmp_module-info.json' 820 atest_utils.save_md5([file1], '/tmp/_tmp_module-info.json') 821 self.assertTrue(atest_utils.check_md5(checksum_file)) 822 os.remove(checksum_file) 823 self.assertFalse(atest_utils.check_md5(checksum_file)) 824 self.assertTrue(atest_utils.check_md5(checksum_file, missing_ok=True)) 825 826 def test_get_config_parameter(self): 827 """Test method of get_config_parameter""" 828 parameter_config = os.path.join( 829 unittest_constants.TEST_DATA_DIR, 'parameter_config', 'parameter.cfg' 830 ) 831 no_parameter_config = os.path.join( 832 unittest_constants.TEST_DATA_DIR, 'parameter_config', 'no_parameter.cfg' 833 ) 834 835 # Test parameter empty value 836 self.assertEqual( 837 set(), atest_utils.get_config_parameter(no_parameter_config) 838 ) 839 840 # Test parameter empty value 841 self.assertEqual( 842 {'value_1', 'value_2', 'value_3', 'value_4'}, 843 atest_utils.get_config_parameter(parameter_config), 844 ) 845 846 def test_get_config_device(self): 847 """Test method of get_config_device""" 848 device_config = os.path.join( 849 unittest_constants.TEST_DATA_DIR, 850 'parameter_config', 851 'multiple_device.cfg', 852 ) 853 self.assertEqual( 854 {'device_1', 'device_2'}, atest_utils.get_config_device(device_config) 855 ) 856 857 def test_get_mainline_param(self): 858 """Test method of get_mainline_param""" 859 mainline_param_config = os.path.join( 860 unittest_constants.TEST_DATA_DIR, 861 'parameter_config', 862 'mainline_param.cfg', 863 ) 864 self.assertEqual( 865 {'foo1.apex', 'foo2.apk+foo3.apk'}, 866 atest_utils.get_mainline_param(mainline_param_config), 867 ) 868 no_mainline_param_config = os.path.join( 869 unittest_constants.TEST_DATA_DIR, 'parameter_config', 'parameter.cfg' 870 ) 871 self.assertEqual( 872 set(), atest_utils.get_mainline_param(no_mainline_param_config) 873 ) 874 875 def test_get_full_annotation_class_name(self): 876 """Test method of get_full_annotation_class_name.""" 877 app_mode_full = 'android.platform.test.annotations.AppModeFull' 878 presubmit = 'android.platform.test.annotations.Presubmit' 879 module_info = { 880 'srcs': [ 881 os.path.join( 882 unittest_constants.TEST_DATA_DIR, 883 'annotation_testing', 884 'Annotation.src', 885 ) 886 ] 887 } 888 # get annotation class from keyword 889 self.assertEqual( 890 atest_utils.get_full_annotation_class_name(module_info, 'presubmit'), 891 presubmit, 892 ) 893 # get annotation class from an accurate fqcn keyword. 894 self.assertEqual( 895 atest_utils.get_full_annotation_class_name(module_info, presubmit), 896 presubmit, 897 ) 898 # accept fqcn keyword in lowercase. 899 self.assertEqual( 900 atest_utils.get_full_annotation_class_name( 901 module_info, 'android.platform.test.annotations.presubmit' 902 ), 903 presubmit, 904 ) 905 # unable to get annotation class from keyword. 906 self.assertNotEqual( 907 atest_utils.get_full_annotation_class_name( 908 module_info, 'appleModefull' 909 ), 910 app_mode_full, 911 ) 912 # do not support partial-correct keyword. 913 self.assertNotEqual( 914 atest_utils.get_full_annotation_class_name( 915 module_info, 'android.platform.test.annotations.pres' 916 ), 917 presubmit, 918 ) 919 920 def test_has_mixed_type_filters_one_module_with_one_type_return_false(self): 921 """Test method of has_mixed_type_filters""" 922 filter_1 = test_info.TestFilter('CLASS', frozenset(['METHOD'])) 923 test_data_1 = {constants.TI_FILTER: [filter_1]} 924 test_info_1 = test_info.TestInfo( 925 'MODULE', 'RUNNER', set(), test_data_1, 'SUITE', '', set() 926 ) 927 self.assertFalse(atest_utils.has_mixed_type_filters([test_info_1])) 928 929 def test_has_mixed_type_filters_one_module_with_mixed_types_return_true(self): 930 """Test method of has_mixed_type_filters""" 931 filter_1 = test_info.TestFilter('CLASS', frozenset(['METHOD'])) 932 filter_2 = test_info.TestFilter('CLASS', frozenset(['METHOD*'])) 933 test_data_2 = {constants.TI_FILTER: [filter_1, filter_2]} 934 test_info_2 = test_info.TestInfo( 935 'MODULE', 'RUNNER', set(), test_data_2, 'SUITE', '', set() 936 ) 937 self.assertTrue(atest_utils.has_mixed_type_filters([test_info_2])) 938 939 def test_has_mixed_type_filters_two_module_with_mixed_types_return_false( 940 self, 941 ): 942 """Test method of has_mixed_type_filters""" 943 filter_1 = test_info.TestFilter('CLASS', frozenset(['METHOD'])) 944 test_data_1 = {constants.TI_FILTER: [filter_1]} 945 test_info_1 = test_info.TestInfo( 946 'MODULE', 'RUNNER', set(), test_data_1, 'SUITE', '', set() 947 ) 948 filter_3 = test_info.TestFilter('CLASS', frozenset(['METHOD*'])) 949 test_data_3 = {constants.TI_FILTER: [filter_3]} 950 test_info_3 = test_info.TestInfo( 951 'MODULE3', 'RUNNER', set(), test_data_3, 'SUITE', '', set() 952 ) 953 self.assertFalse( 954 atest_utils.has_mixed_type_filters([test_info_1, test_info_3]) 955 ) 956 957 def test_get_filter_types(self): 958 """Test method of get_filter_types.""" 959 filters = set(['CLASS#METHOD']) 960 expect_types = set([FilterType.REGULAR_FILTER.value]) 961 self.assertEqual(atest_utils.get_filter_types(filters), expect_types) 962 963 filters = set(['CLASS#METHOD*']) 964 expect_types = set([FilterType.WILDCARD_FILTER.value]) 965 self.assertEqual(atest_utils.get_filter_types(filters), expect_types) 966 967 filters = set(['CLASS#METHOD', 'CLASS#METHOD*']) 968 expect_types = set( 969 [FilterType.WILDCARD_FILTER.value, FilterType.REGULAR_FILTER.value] 970 ) 971 self.assertEqual(atest_utils.get_filter_types(filters), expect_types) 972 973 filters = set(['CLASS#METHOD?', 'CLASS#METHOD*']) 974 expect_types = set([FilterType.WILDCARD_FILTER.value]) 975 self.assertEqual(atest_utils.get_filter_types(filters), expect_types) 976 977 def test_get_bp_content(self): 978 """Method get_bp_content.""" 979 # 1. "manifest" and "instrumentation_for" are defined. 980 content = """android_test { 981 // comment 982 instrumentation_for: "AmSlam", // comment 983 manifest: "AndroidManifest-test.xml", 984 name: "AmSlamTests", 985 }""" 986 expected_result = { 987 'AmSlamTests': { 988 'target_module': 'AmSlam', 989 'manifest': 'AndroidManifest-test.xml', 990 } 991 } 992 temp_dir = tempfile.TemporaryDirectory() 993 tmpbp = Path(temp_dir.name).joinpath('Android.bp') 994 with open(tmpbp, 'w') as cache: 995 cache.write(content) 996 self.assertEqual( 997 atest_utils.get_bp_content(tmpbp, 'android_test'), expected_result 998 ) 999 temp_dir.cleanup() 1000 1001 # 2. Only name is defined, will give default manifest and null 1002 # target_module. 1003 content = """android_app { 1004 // comment 1005 name: "AmSlam", 1006 srcs: ["src1.java", "src2.java"] 1007 }""" 1008 expected_result = { 1009 'AmSlam': {'target_module': '', 'manifest': 'AndroidManifest.xml'} 1010 } 1011 temp_dir = tempfile.TemporaryDirectory() 1012 tmpbp = Path(temp_dir.name).joinpath('Android.bp') 1013 with open(tmpbp, 'w') as cache: 1014 cache.write(content) 1015 self.assertEqual( 1016 atest_utils.get_bp_content(tmpbp, 'android_app'), expected_result 1017 ) 1018 temp_dir.cleanup() 1019 1020 # 3. Not even an Android.bp. 1021 content = """LOCAL_PATH := $(call my-dir) 1022 # comment 1023 include $(call all-subdir-makefiles) 1024 LOCAL_MODULE := atest_foo_test 1025 }""" 1026 temp_dir = tempfile.TemporaryDirectory() 1027 tmpbp = Path(temp_dir.name).joinpath('Android.mk') 1028 with open(tmpbp, 'w') as cache: 1029 cache.write(content) 1030 self.assertEqual(atest_utils.get_bp_content(tmpbp, 'android_app'), {}) 1031 temp_dir.cleanup() 1032 1033 def test_get_manifest_info(self): 1034 """test get_manifest_info method.""" 1035 # An instrumentation test: 1036 test_xml = os.path.join( 1037 unittest_constants.TEST_DATA_DIR, 1038 'foo/bar/AmSlam/test/AndroidManifest.xml', 1039 ) 1040 expected = { 1041 'package': 'com.android.settings.tests.unit', 1042 'target_package': 'c0m.andr0id.settingS', 1043 'persistent': False, 1044 } 1045 self.assertEqual(expected, atest_utils.get_manifest_info(test_xml)) 1046 1047 # A target module: 1048 target_xml = os.path.join( 1049 unittest_constants.TEST_DATA_DIR, 'foo/bar/AmSlam/AndroidManifest.xml' 1050 ) 1051 expected = { 1052 'package': 'c0m.andr0id.settingS', 1053 'target_package': '', 1054 'persistent': False, 1055 } 1056 self.assertEqual(expected, atest_utils.get_manifest_info(target_xml)) 1057 1058 1059class GetTradefedInvocationTimeTest(fake_filesystem_unittest.TestCase): 1060 """Tests of get_tradefed_invocation_time for various conditions.""" 1061 1062 def setUp(self): 1063 self.setUpPyfakefs() 1064 self.log_path = '/somewhere/atest/log' 1065 1066 def test_get_tradefed_invocation_time_second_only(self): 1067 """Test the parser can handle second and millisecond properly.""" 1068 end_host_log_file = Path( 1069 self.log_path, 1070 'inv_hashed_path', 1071 'inv_hashed_subpath', 1072 'end_host_log_test1.txt', 1073 ) 1074 contents = """ 1075=============== Consumed Time ============== 1076 x86_64 HelloWorldTests: 1s 1077 x86_64 hallo-welt: 768 ms 1078Total aggregated tests run time: 1s 1079============== Modules Preparation Times ============== 1080 x86_64 HelloWorldTests => prep = 2580 ms || clean = 298 ms 1081 x86_64 hallo-welt => prep = 1736 ms || clean = 243 ms 1082Total preparation time: 4s || Total tear down time: 541 ms 1083======================================================= 1084=============== Summary =============== 1085Total Run time: 6s 10862/2 modules completed 1087Total Tests : 3 1088PASSED : 3 1089FAILED : 0 1090============== End of Results ==============""" 1091 self.fs.create_file(end_host_log_file, contents=contents) 1092 test = 1 * 1000 + 768 1093 prep = 2580 + 1736 1094 teardown = 298 + 243 1095 expected_elapsed_time = (test, prep, teardown) 1096 1097 actual_elapsed_time = atest_utils.get_tradefed_invocation_time( 1098 self.log_path 1099 ) 1100 1101 self.assertEqual(actual_elapsed_time, expected_elapsed_time) 1102 1103 def test_get_tradefed_invocation_time_from_hours_to_milliseconds(self): 1104 """Test whether the parse can handle from hour to ms properly.""" 1105 end_host_log_file = Path( 1106 self.log_path, 1107 'inv_hashed_path', 1108 'inv_hashed_subpath', 1109 'end_host_log_test2.txt', 1110 ) 1111 contents = """ 1112=============== Consumed Time ============== 1113 x86_64 HelloWorldTests: 27m 19s 1114 x86_64 hallo-welt: 3m 2s 1115Total aggregated tests run time: 31m 1116============== Modules Preparation Times ============== 1117 x86_64 HelloWorldTests => prep = 2580 ms || clean = 1298 ms 1118 x86_64 hallo-welt => prep = 1736 ms || clean = 1243 ms 1119Total preparation time: 1h 24m 17s || Total tear down time: 3s 1120======================================================= 1121=============== Summary =============== 1122Total Run time: 2h 5m 17s 11232/2 modules completed 1124Total Tests : 3 1125PASSED : 3 1126FAILED : 0 1127============== End of Results ==============""" 1128 self.fs.create_file(end_host_log_file, contents=contents) 1129 test = (27 * 60 + 19) * 1000 + (3 * 60 + 2) * 1000 1130 prep = 2580 + 1736 1131 teardown = 1298 + 1243 1132 expected_elapsed_time = (test, prep, teardown) 1133 1134 actual_elapsed_time = atest_utils.get_tradefed_invocation_time( 1135 self.log_path 1136 ) 1137 1138 self.assertEqual(actual_elapsed_time, expected_elapsed_time) 1139 1140 def test_get_tradefed_invocation_time_null_result(self): 1141 """Test whether the parser returns null tuple when no keywords found.""" 1142 end_host_log_file = Path( 1143 self.log_path, 1144 'inv_hashed_path', 1145 'inv_hashed_subpath', 1146 'end_host_log_test4.txt', 1147 ) 1148 contents = 'some\ncontext' 1149 self.fs.create_file(end_host_log_file, contents=contents) 1150 expected_elapsed_time = (0, 0, 0) 1151 1152 actual_elapsed_time = atest_utils.get_tradefed_invocation_time( 1153 self.log_path 1154 ) 1155 1156 self.assertEqual(actual_elapsed_time, expected_elapsed_time) 1157 1158 1159if __name__ == '__main__': 1160 unittest.main() 1161