1#!/usr/bin/python3 2 3# pylint: disable=missing-docstring 4 5from __future__ import absolute_import, division, print_function 6 7import errno 8import io 9import itertools 10import logging 11import os 12import select 13import socket 14import subprocess 15import time 16import unittest 17 18# mock 1.0.0 (in site-packages/chromite/third_party/mock.py) 19# which is an ancestor of Python's default library starting from Python 3.3. 20# See https://docs.python.org/3/library/unittest.mock.html 21import mock as pymock 22import six 23from six.moves import range, urllib 24 25import common 26 27from autotest_lib.client.common_lib import autotemp, utils 28from autotest_lib.client.common_lib.test_utils import mock 29 30metrics = utils.metrics_mock 31 32 33class test_read_one_line(unittest.TestCase): 34 def setUp(self): 35 self.god = mock.mock_god(ut=self) 36 self.god.stub_function(utils, "open") 37 38 39 def tearDown(self): 40 self.god.unstub_all() 41 42 43 def test_ip_to_long(self): 44 self.assertEqual(utils.ip_to_long('0.0.0.0'), 0) 45 self.assertEqual(utils.ip_to_long('255.255.255.255'), 4294967295) 46 self.assertEqual(utils.ip_to_long('192.168.0.1'), 3232235521) 47 self.assertEqual(utils.ip_to_long('1.2.4.8'), 16909320) 48 49 50 def test_long_to_ip(self): 51 self.assertEqual(utils.long_to_ip(0), '0.0.0.0') 52 self.assertEqual(utils.long_to_ip(4294967295), '255.255.255.255') 53 self.assertEqual(utils.long_to_ip(3232235521), '192.168.0.1') 54 self.assertEqual(utils.long_to_ip(16909320), '1.2.4.8') 55 56 57 def test_create_subnet_mask(self): 58 self.assertEqual(utils.create_subnet_mask(0), 0) 59 self.assertEqual(utils.create_subnet_mask(32), 4294967295) 60 self.assertEqual(utils.create_subnet_mask(25), 4294967168) 61 62 63 def test_format_ip_with_mask(self): 64 self.assertEqual(utils.format_ip_with_mask('192.168.0.1', 0), 65 '0.0.0.0/0') 66 self.assertEqual(utils.format_ip_with_mask('192.168.0.1', 32), 67 '192.168.0.1/32') 68 self.assertEqual(utils.format_ip_with_mask('192.168.0.1', 26), 69 '192.168.0.0/26') 70 self.assertEqual(utils.format_ip_with_mask('192.168.0.255', 26), 71 '192.168.0.192/26') 72 73 74 def create_test_file(self, contents): 75 test_file = six.StringIO(contents) 76 utils.open.expect_call("filename", "r").and_return(test_file) 77 78 79 def test_reads_one_line_file(self): 80 self.create_test_file("abc\n") 81 self.assertEqual("abc", utils.read_one_line("filename")) 82 self.god.check_playback() 83 84 85 def test_strips_read_lines(self): 86 self.create_test_file("abc \n") 87 self.assertEqual("abc ", utils.read_one_line("filename")) 88 self.god.check_playback() 89 90 91 def test_drops_extra_lines(self): 92 self.create_test_file("line 1\nline 2\nline 3\n") 93 self.assertEqual("line 1", utils.read_one_line("filename")) 94 self.god.check_playback() 95 96 97 def test_works_on_empty_file(self): 98 self.create_test_file("") 99 self.assertEqual("", utils.read_one_line("filename")) 100 self.god.check_playback() 101 102 103 def test_works_on_file_with_no_newlines(self): 104 self.create_test_file("line but no newline") 105 self.assertEqual("line but no newline", 106 utils.read_one_line("filename")) 107 self.god.check_playback() 108 109 110 def test_preserves_leading_whitespace(self): 111 self.create_test_file(" has leading whitespace") 112 self.assertEqual(" has leading whitespace", 113 utils.read_one_line("filename")) 114 115 116class test_write_one_line(unittest.TestCase): 117 def setUp(self): 118 self.god = mock.mock_god(ut=self) 119 self.god.stub_function(utils, "open") 120 121 122 def tearDown(self): 123 self.god.unstub_all() 124 125 126 def get_write_one_line_output(self, content): 127 test_file = mock.SaveDataAfterCloseStringIO() 128 utils.open.expect_call("filename", "w").and_return(test_file) 129 utils.write_one_line("filename", content) 130 self.god.check_playback() 131 return test_file.final_data 132 133 134 def test_writes_one_line_file(self): 135 self.assertEqual("abc\n", self.get_write_one_line_output("abc")) 136 137 138 def test_preserves_existing_newline(self): 139 self.assertEqual("abc\n", self.get_write_one_line_output("abc\n")) 140 141 142 def test_preserves_leading_whitespace(self): 143 self.assertEqual(" abc\n", self.get_write_one_line_output(" abc")) 144 145 146 def test_preserves_trailing_whitespace(self): 147 self.assertEqual("abc \n", self.get_write_one_line_output("abc ")) 148 149 150 def test_handles_empty_input(self): 151 self.assertEqual("\n", self.get_write_one_line_output("")) 152 153 154class test_open_write_close(unittest.TestCase): 155 def setUp(self): 156 self.god = mock.mock_god(ut=self) 157 self.god.stub_function(utils, "open") 158 159 160 def tearDown(self): 161 self.god.unstub_all() 162 163 164 def test_simple_functionality(self): 165 data = "\n\nwhee\n" 166 test_file = mock.SaveDataAfterCloseStringIO() 167 utils.open.expect_call("filename", "w").and_return(test_file) 168 utils.open_write_close("filename", data) 169 self.god.check_playback() 170 self.assertEqual(data, test_file.final_data) 171 172 def test_binary_functionality(self): 173 data = bytearray([0, 1, 3, 23, 0, 71, 254, 255, 127, 128]) 174 test_file = mock.SaveDataAfterCloseBytesIO() 175 utils.open.expect_call("filename", "wb").and_return(test_file) 176 utils.open_write_close("filename", data, is_binary=True) 177 self.god.check_playback() 178 self.assertEqual(data, test_file.final_data) 179 180 181class test_read_keyval(unittest.TestCase): 182 def setUp(self): 183 self.god = mock.mock_god(ut=self) 184 self.god.stub_function(utils, "open") 185 self.god.stub_function(os.path, "isdir") 186 self.god.stub_function(os.path, "exists") 187 188 189 def tearDown(self): 190 self.god.unstub_all() 191 192 193 def create_test_file(self, filename, contents): 194 test_file = six.StringIO(contents) 195 os.path.exists.expect_call(filename).and_return(True) 196 utils.open.expect_call(filename).and_return(test_file) 197 198 199 def read_keyval(self, contents): 200 os.path.isdir.expect_call("file").and_return(False) 201 self.create_test_file("file", contents) 202 keyval = utils.read_keyval("file") 203 self.god.check_playback() 204 return keyval 205 206 207 def test_returns_empty_when_file_doesnt_exist(self): 208 os.path.isdir.expect_call("file").and_return(False) 209 os.path.exists.expect_call("file").and_return(False) 210 self.assertEqual({}, utils.read_keyval("file")) 211 self.god.check_playback() 212 213 214 def test_accesses_files_directly(self): 215 os.path.isdir.expect_call("file").and_return(False) 216 self.create_test_file("file", "") 217 utils.read_keyval("file") 218 self.god.check_playback() 219 220 221 def test_accesses_directories_through_keyval_file(self): 222 os.path.isdir.expect_call("dir").and_return(True) 223 self.create_test_file("dir/keyval", "") 224 utils.read_keyval("dir") 225 self.god.check_playback() 226 227 228 def test_values_are_rstripped(self): 229 keyval = self.read_keyval("a=b \n") 230 self.assertEquals(keyval, {"a": "b"}) 231 232 233 def test_comments_are_ignored(self): 234 keyval = self.read_keyval("a=b # a comment\n") 235 self.assertEquals(keyval, {"a": "b"}) 236 237 238 def test_integers_become_ints(self): 239 keyval = self.read_keyval("a=1\n") 240 self.assertEquals(keyval, {"a": 1}) 241 self.assertEquals(int, type(keyval["a"])) 242 243 244 def test_float_values_become_floats(self): 245 keyval = self.read_keyval("a=1.5\n") 246 self.assertEquals(keyval, {"a": 1.5}) 247 self.assertEquals(float, type(keyval["a"])) 248 249 250 def test_multiple_lines(self): 251 keyval = self.read_keyval("a=one\nb=two\n") 252 self.assertEquals(keyval, {"a": "one", "b": "two"}) 253 254 255 def test_the_last_duplicate_line_is_used(self): 256 keyval = self.read_keyval("a=one\nb=two\na=three\n") 257 self.assertEquals(keyval, {"a": "three", "b": "two"}) 258 259 260 def test_extra_equals_are_included_in_values(self): 261 keyval = self.read_keyval("a=b=c\n") 262 self.assertEquals(keyval, {"a": "b=c"}) 263 264 265 def test_non_alphanumeric_keynames_are_rejected(self): 266 self.assertRaises(ValueError, self.read_keyval, "a$=one\n") 267 268 269 def test_underscores_are_allowed_in_key_names(self): 270 keyval = self.read_keyval("a_b=value\n") 271 self.assertEquals(keyval, {"a_b": "value"}) 272 273 274 def test_dashes_are_allowed_in_key_names(self): 275 keyval = self.read_keyval("a-b=value\n") 276 self.assertEquals(keyval, {"a-b": "value"}) 277 278 def test_empty_value_is_allowed(self): 279 keyval = self.read_keyval("a=\n") 280 self.assertEquals(keyval, {"a": ""}) 281 282 283class test_write_keyval(unittest.TestCase): 284 def setUp(self): 285 self.god = mock.mock_god(ut=self) 286 self.god.stub_function(utils, "open") 287 self.god.stub_function(os.path, "isdir") 288 289 290 def tearDown(self): 291 self.god.unstub_all() 292 293 294 def assertHasLines(self, value, lines): 295 vlines = value.splitlines() 296 vlines.sort() 297 self.assertEquals(vlines, sorted(lines)) 298 299 300 def write_keyval(self, filename, dictionary, expected_filename=None, 301 type_tag=None): 302 if expected_filename is None: 303 expected_filename = filename 304 test_file = six.StringIO() 305 self.god.stub_function(test_file, "close") 306 utils.open.expect_call(expected_filename, "a").and_return(test_file) 307 test_file.close.expect_call() 308 if type_tag is None: 309 utils.write_keyval(filename, dictionary) 310 else: 311 utils.write_keyval(filename, dictionary, type_tag) 312 return test_file.getvalue() 313 314 315 def write_keyval_file(self, dictionary, type_tag=None): 316 os.path.isdir.expect_call("file").and_return(False) 317 return self.write_keyval("file", dictionary, type_tag=type_tag) 318 319 320 def test_accesses_files_directly(self): 321 os.path.isdir.expect_call("file").and_return(False) 322 result = self.write_keyval("file", {"a": "1"}) 323 self.assertEquals(result, "a=1\n") 324 325 326 def test_accesses_directories_through_keyval_file(self): 327 os.path.isdir.expect_call("dir").and_return(True) 328 result = self.write_keyval("dir", {"b": "2"}, "dir/keyval") 329 self.assertEquals(result, "b=2\n") 330 331 332 def test_numbers_are_stringified(self): 333 result = self.write_keyval_file({"c": 3}) 334 self.assertEquals(result, "c=3\n") 335 336 337 def test_type_tags_are_excluded_by_default(self): 338 result = self.write_keyval_file({"d": "a string"}) 339 self.assertEquals(result, "d=a string\n") 340 self.assertRaises(ValueError, self.write_keyval_file, 341 {"d{perf}": "a string"}) 342 343 344 def test_perf_tags_are_allowed(self): 345 result = self.write_keyval_file({"a{perf}": 1, "b{perf}": 2}, 346 type_tag="perf") 347 self.assertHasLines(result, ["a{perf}=1", "b{perf}=2"]) 348 self.assertRaises(ValueError, self.write_keyval_file, 349 {"a": 1, "b": 2}, type_tag="perf") 350 351 352 def test_non_alphanumeric_keynames_are_rejected(self): 353 self.assertRaises(ValueError, self.write_keyval_file, {"x$": 0}) 354 355 356 def test_underscores_are_allowed_in_key_names(self): 357 result = self.write_keyval_file({"a_b": "value"}) 358 self.assertEquals(result, "a_b=value\n") 359 360 361 def test_dashes_are_allowed_in_key_names(self): 362 result = self.write_keyval_file({"a-b": "value"}) 363 self.assertEquals(result, "a-b=value\n") 364 365 366class test_is_url(unittest.TestCase): 367 def test_accepts_http(self): 368 self.assertTrue(utils.is_url("http://example.com")) 369 370 371 def test_accepts_ftp(self): 372 self.assertTrue(utils.is_url("ftp://ftp.example.com")) 373 374 375 def test_rejects_local_path(self): 376 self.assertFalse(utils.is_url("/home/username/file")) 377 378 379 def test_rejects_local_filename(self): 380 self.assertFalse(utils.is_url("filename")) 381 382 383 def test_rejects_relative_local_path(self): 384 self.assertFalse(utils.is_url("somedir/somesubdir/file")) 385 386 387 def test_rejects_local_path_containing_url(self): 388 self.assertFalse(utils.is_url("somedir/http://path/file")) 389 390 391class test_urlopen(unittest.TestCase): 392 def setUp(self): 393 self.god = mock.mock_god(ut=self) 394 395 396 def tearDown(self): 397 self.god.unstub_all() 398 399 400 def stub_urlopen_with_timeout_comparison(self, test_func, expected_return, 401 *expected_args): 402 expected_args += (None,) * (2 - len(expected_args)) 403 def urlopen(url, data=None): 404 self.assertEquals(expected_args, (url,data)) 405 test_func(socket.getdefaulttimeout()) 406 return expected_return 407 self.god.stub_with(urllib.request, "urlopen", urlopen) 408 409 410 def stub_urlopen_with_timeout_check(self, expected_timeout, 411 expected_return, *expected_args): 412 def test_func(timeout): 413 self.assertEquals(timeout, expected_timeout) 414 self.stub_urlopen_with_timeout_comparison(test_func, expected_return, 415 *expected_args) 416 417 418 def test_timeout_set_during_call(self): 419 self.stub_urlopen_with_timeout_check(30, "retval", "url") 420 retval = utils.urlopen("url", timeout=30) 421 self.assertEquals(retval, "retval") 422 423 424 def test_timeout_reset_after_call(self): 425 old_timeout = socket.getdefaulttimeout() 426 self.stub_urlopen_with_timeout_check(30, None, "url") 427 try: 428 socket.setdefaulttimeout(1234) 429 utils.urlopen("url", timeout=30) 430 self.assertEquals(1234, socket.getdefaulttimeout()) 431 finally: 432 socket.setdefaulttimeout(old_timeout) 433 434 435 def test_timeout_set_by_default(self): 436 def test_func(timeout): 437 self.assertTrue(timeout is not None) 438 self.stub_urlopen_with_timeout_comparison(test_func, None, "url") 439 utils.urlopen("url") 440 441 442 def test_args_are_untouched(self): 443 self.stub_urlopen_with_timeout_check(30, None, "http://url", 444 "POST data") 445 utils.urlopen("http://url", timeout=30, data="POST data") 446 447 448class test_urlretrieve(unittest.TestCase): 449 def setUp(self): 450 self.god = mock.mock_god(ut=self) 451 452 453 def tearDown(self): 454 self.god.unstub_all() 455 456 457 def test_urlopen_passed_arguments(self): 458 self.god.stub_function(utils, "urlopen") 459 self.god.stub_function(utils.shutil, "copyfileobj") 460 self.god.stub_function(utils, "open") 461 462 url = "url" 463 dest = "somefile" 464 data = object() 465 timeout = 10 466 467 src_file = self.god.create_mock_class(io.IOBase, "file") 468 dest_file = self.god.create_mock_class(io.IOBase, "file") 469 470 (utils.urlopen.expect_call(url, data=data, timeout=timeout) 471 .and_return(src_file)) 472 utils.open.expect_call(dest, "wb").and_return(dest_file) 473 utils.shutil.copyfileobj.expect_call(src_file, dest_file) 474 dest_file.close.expect_call() 475 src_file.close.expect_call() 476 477 utils.urlretrieve(url, dest, data=data, timeout=timeout) 478 self.god.check_playback() 479 480 481class test_merge_trees(unittest.TestCase): 482 # a some path-handling helper functions 483 def src(self, *path_segments): 484 return os.path.join(self.src_tree.name, *path_segments) 485 486 487 def dest(self, *path_segments): 488 return os.path.join(self.dest_tree.name, *path_segments) 489 490 491 def paths(self, *path_segments): 492 return self.src(*path_segments), self.dest(*path_segments) 493 494 495 def assertFileEqual(self, *path_segments): 496 src, dest = self.paths(*path_segments) 497 self.assertEqual(True, os.path.isfile(src)) 498 self.assertEqual(True, os.path.isfile(dest)) 499 self.assertEqual(os.path.getsize(src), os.path.getsize(dest)) 500 self.assertEqual(open(src).read(), open(dest).read()) 501 502 503 def assertFileContents(self, contents, *path_segments): 504 dest = self.dest(*path_segments) 505 self.assertEqual(True, os.path.isfile(dest)) 506 self.assertEqual(os.path.getsize(dest), len(contents)) 507 self.assertEqual(contents, open(dest).read()) 508 509 510 def setUp(self): 511 self.src_tree = autotemp.tempdir(unique_id='utilsrc') 512 self.dest_tree = autotemp.tempdir(unique_id='utilsdest') 513 514 # empty subdirs 515 os.mkdir(self.src("empty")) 516 os.mkdir(self.dest("empty")) 517 518 519 def tearDown(self): 520 self.src_tree.clean() 521 self.dest_tree.clean() 522 523 524 def test_both_dont_exist(self): 525 utils.merge_trees(*self.paths("empty")) 526 527 528 def test_file_only_at_src(self): 529 with open(self.src("src_only"), "w") as wf: 530 print("line 1", file=wf) 531 utils.merge_trees(*self.paths("src_only")) 532 self.assertFileEqual("src_only") 533 534 535 def test_file_only_at_dest(self): 536 with open(self.dest("dest_only"), "w") as wf: 537 print("line 1", file=wf) 538 utils.merge_trees(*self.paths("dest_only")) 539 self.assertEqual(False, os.path.exists(self.src("dest_only"))) 540 self.assertFileContents("line 1\n", "dest_only") 541 542 543 def test_file_at_both(self): 544 with open(self.dest("in_both"), "w") as wf1: 545 print("line 1", file=wf1) 546 with open(self.src("in_both"), "w") as wf2: 547 print("line 2", file=wf2) 548 utils.merge_trees(*self.paths("in_both")) 549 self.assertFileContents("line 1\nline 2\n", "in_both") 550 551 552 def test_directory_with_files_in_both(self): 553 with open(self.dest("in_both"), "w") as wf1: 554 print("line 1", file=wf1) 555 with open(self.src("in_both"), "w") as wf2: 556 print("line 3", file=wf2) 557 utils.merge_trees(*self.paths()) 558 self.assertFileContents("line 1\nline 3\n", "in_both") 559 560 561 def test_directory_with_mix_of_files(self): 562 with open(self.dest("in_dest"), "w") as wf1: 563 print("dest line", file=wf1) 564 with open(self.src("in_src"), "w") as wf2: 565 print("src line", file=wf2) 566 utils.merge_trees(*self.paths()) 567 self.assertFileContents("dest line\n", "in_dest") 568 self.assertFileContents("src line\n", "in_src") 569 570 571 def test_directory_with_subdirectories(self): 572 os.mkdir(self.src("src_subdir")) 573 with open(self.src("src_subdir", "subfile"), "w") as wf1: 574 print("subdir line", file=wf1) 575 os.mkdir(self.src("both_subdir")) 576 os.mkdir(self.dest("both_subdir")) 577 with open(self.src("both_subdir", "subfile"), "w") as wf2: 578 print("src line", file=wf2) 579 with open(self.dest("both_subdir", "subfile"), "w") as wf3: 580 print("dest line", file=wf3) 581 utils.merge_trees(*self.paths()) 582 self.assertFileContents("subdir line\n", "src_subdir", "subfile") 583 self.assertFileContents("dest line\nsrc line\n", "both_subdir", 584 "subfile") 585 586 587class test_get_relative_path(unittest.TestCase): 588 def test_not_absolute(self): 589 self.assertRaises(AssertionError, utils.get_relative_path, "a", "b") 590 591 def test_same_dir(self): 592 self.assertEqual(utils.get_relative_path("/a/b/c", "/a/b"), "c") 593 594 def test_forward_dir(self): 595 self.assertEqual(utils.get_relative_path("/a/b/c/d", "/a/b"), "c/d") 596 597 def test_previous_dir(self): 598 self.assertEqual(utils.get_relative_path("/a/b", "/a/b/c/d"), "../..") 599 600 def test_parallel_dir(self): 601 self.assertEqual(utils.get_relative_path("/a/c/d", "/a/b/c/d"), 602 "../../../c/d") 603 604 605class test_sh_escape(unittest.TestCase): 606 def _test_in_shell(self, text): 607 escaped_text = utils.sh_escape(text) 608 proc = subprocess.Popen('echo "%s"' % escaped_text, shell=True, 609 stdin=open(os.devnull, 'r'), 610 stdout=subprocess.PIPE, 611 stderr=open(os.devnull, 'w')) 612 stdout, _ = proc.communicate() 613 self.assertEqual(proc.returncode, 0) 614 self.assertEqual(stdout[:-1].decode(), text) 615 616 617 def test_normal_string(self): 618 self._test_in_shell('abcd') 619 620 621 def test_spaced_string(self): 622 self._test_in_shell('abcd efgh') 623 624 625 def test_dollar(self): 626 self._test_in_shell('$') 627 628 629 def test_single_quote(self): 630 self._test_in_shell('\'') 631 632 633 def test_single_quoted_string(self): 634 self._test_in_shell('\'efgh\'') 635 636 637 def test_string_with_single_quote(self): 638 self._test_in_shell("a'b") 639 640 641 def test_string_with_escaped_single_quote(self): 642 self._test_in_shell(r"a\'b") 643 644 645 def test_double_quote(self): 646 self._test_in_shell('"') 647 648 649 def test_double_quoted_string(self): 650 self._test_in_shell('"abcd"') 651 652 653 def test_backtick(self): 654 self._test_in_shell('`') 655 656 657 def test_backticked_string(self): 658 self._test_in_shell('`jklm`') 659 660 661 def test_backslash(self): 662 self._test_in_shell('\\') 663 664 665 def test_backslashed_special_characters(self): 666 self._test_in_shell('\\$') 667 self._test_in_shell('\\"') 668 self._test_in_shell('\\\'') 669 self._test_in_shell('\\`') 670 671 672 def test_backslash_codes(self): 673 self._test_in_shell('\\n') 674 self._test_in_shell('\\r') 675 self._test_in_shell('\\t') 676 self._test_in_shell('\\v') 677 self._test_in_shell('\\b') 678 self._test_in_shell('\\a') 679 self._test_in_shell('\\000') 680 681 def test_real_newline(self): 682 self._test_in_shell('\n') 683 self._test_in_shell('\\\n') 684 685 686class test_sh_quote_word(test_sh_escape): 687 """Run tests on sh_quote_word. 688 689 Inherit from test_sh_escape to get the same tests to run on both. 690 """ 691 692 def _test_in_shell(self, text): 693 quoted_word = utils.sh_quote_word(text) 694 echoed_value = subprocess.check_output('echo %s' % quoted_word, 695 shell=True) 696 self.assertEqual(echoed_value.decode(), text + '\n') 697 698 699class test_nested_sh_quote_word(test_sh_quote_word): 700 """Run nested tests on sh_quote_word. 701 702 Inherit from test_sh_quote_word to get the same tests to run on both. 703 """ 704 705 def _test_in_shell(self, text): 706 command = 'echo ' + utils.sh_quote_word(text) 707 nested_command = 'echo ' + utils.sh_quote_word(command) 708 produced_command = subprocess.check_output(nested_command, shell=True) 709 echoed_value = subprocess.check_output(produced_command, shell=True) 710 self.assertEqual(echoed_value.decode(), text + '\n') 711 712 713class test_run(unittest.TestCase): 714 """ 715 Test the utils.run() function. 716 717 Note: This test runs simple external commands to test the utils.run() 718 API without assuming implementation details. 719 """ 720 721 # Log levels in ascending severity. 722 LOG_LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, 723 logging.CRITICAL] 724 725 726 def setUp(self): 727 self.god = mock.mock_god(ut=self) 728 self.god.stub_function(utils.logging, 'warning') 729 self.god.stub_function(utils.logging, 'debug') 730 731 # Log level -> StringIO.StringIO. 732 self.logs = {} 733 for level in self.LOG_LEVELS: 734 self.logs[level] = six.StringIO() 735 736 # Override logging_manager.LoggingFile to return buffers. 737 def logging_file(level=None, prefix=None): 738 return self.logs[level] 739 self.god.stub_with(utils.logging_manager, 'LoggingFile', logging_file) 740 741 def tearDown(self): 742 self.god.unstub_all() 743 744 745 def __check_result(self, result, command, exit_status=0, stdout='', 746 stderr=''): 747 self.assertEquals(result.command, command) 748 self.assertEquals(result.exit_status, exit_status) 749 self.assertEquals(result.stdout, stdout) 750 self.assertEquals(result.stderr, stderr) 751 752 753 def __get_logs(self): 754 """Returns contents of log buffers at all levels. 755 756 @return: 5-element list of strings corresponding to logged messages 757 at the levels in self.LOG_LEVELS. 758 """ 759 return [self.logs[v].getvalue() for v in self.LOG_LEVELS] 760 761 762 def test_default_simple(self): 763 cmd = 'echo "hello world"' 764 # expect some king of logging.debug() call but don't care about args 765 utils.logging.debug.expect_any_call() 766 self.__check_result(utils.run(cmd), cmd, stdout='hello world\n') 767 768 769 def test_default_failure(self): 770 cmd = 'exit 11' 771 try: 772 utils.run(cmd, verbose=False) 773 except utils.error.CmdError as err: 774 self.__check_result(err.result_obj, cmd, exit_status=11) 775 776 777 def test_ignore_status(self): 778 cmd = 'echo error >&2 && exit 11' 779 self.__check_result(utils.run(cmd, ignore_status=True, verbose=False), 780 cmd, exit_status=11, stderr='error\n') 781 782 783 def test_timeout(self): 784 # we expect a logging.warning() message, don't care about the contents 785 utils.logging.warning.expect_any_call() 786 try: 787 utils.run('echo -n output && sleep 10', timeout=1, verbose=False) 788 except utils.error.CmdError as err: 789 self.assertEquals(err.result_obj.stdout, 'output') 790 791 792 def test_stdout_stderr_tee(self): 793 cmd = 'echo output && echo error >&2' 794 stdout_tee = six.StringIO() 795 stderr_tee = six.StringIO() 796 797 self.__check_result(utils.run( 798 cmd, stdout_tee=stdout_tee, stderr_tee=stderr_tee, 799 verbose=False), cmd, stdout='output\n', stderr='error\n') 800 self.assertEqual(stdout_tee.getvalue(), 'output\n') 801 self.assertEqual(stderr_tee.getvalue(), 'error\n') 802 803 804 def test_stdin_string(self): 805 cmd = 'cat' 806 self.__check_result(utils.run(cmd, verbose=False, stdin='hi!\n'), 807 cmd, stdout='hi!\n') 808 809 810 def test_stdout_tee_to_logs_info(self): 811 """Test logging stdout at the info level.""" 812 utils.run('echo output', stdout_tee=utils.TEE_TO_LOGS, 813 stdout_level=logging.INFO, verbose=False) 814 self.assertEqual(self.__get_logs(), ['', 'output\n', '', '', '']) 815 816 817 def test_stdout_tee_to_logs_warning(self): 818 """Test logging stdout at the warning level.""" 819 utils.run('echo output', stdout_tee=utils.TEE_TO_LOGS, 820 stdout_level=logging.WARNING, verbose=False) 821 self.assertEqual(self.__get_logs(), ['', '', 'output\n', '', '']) 822 823 824 def test_stdout_and_stderr_tee_to_logs(self): 825 """Test simultaneous stdout and stderr log levels.""" 826 utils.run('echo output && echo error >&2', stdout_tee=utils.TEE_TO_LOGS, 827 stderr_tee=utils.TEE_TO_LOGS, stdout_level=logging.INFO, 828 stderr_level=logging.ERROR, verbose=False) 829 self.assertEqual(self.__get_logs(), ['', 'output\n', '', 'error\n', '']) 830 831 832 def test_default_expected_stderr_log_level(self): 833 """Test default expected stderr log level. 834 835 stderr should be logged at the same level as stdout when 836 stderr_is_expected is true and stderr_level isn't passed. 837 """ 838 utils.run('echo output && echo error >&2', stdout_tee=utils.TEE_TO_LOGS, 839 stderr_tee=utils.TEE_TO_LOGS, stdout_level=logging.INFO, 840 stderr_is_expected=True, verbose=False) 841 self.assertEqual(self.__get_logs(), ['', 'output\nerror\n', '', '', '']) 842 843 844 def test_safe_args(self): 845 # NOTE: The string in expected_quoted_cmd depends on the internal 846 # implementation of shell quoting which is used by utils.run(), 847 # in this case, sh_quote_word(). 848 expected_quoted_cmd = "echo 'hello \"world' again" 849 self.__check_result(utils.run( 850 'echo', verbose=False, args=('hello "world', 'again')), 851 expected_quoted_cmd, stdout='hello "world again\n') 852 853 854 def test_safe_args_given_string(self): 855 self.assertRaises(TypeError, utils.run, 'echo', args='hello') 856 857 858 def test_wait_interrupt(self): 859 """Test that we actually select twice if the first one returns EINTR.""" 860 utils.logging.debug.expect_any_call() 861 862 bg_job = utils.BgJob('echo "hello world"') 863 bg_job.result.exit_status = 0 864 self.god.stub_function(utils.select, 'select') 865 866 utils.select.select.expect_any_call().and_raises( 867 select.error(errno.EINTR, 'Select interrupted')) 868 utils.logging.warning.expect_any_call() 869 870 utils.select.select.expect_any_call().and_return( 871 ([bg_job.sp.stdout, bg_job.sp.stderr], [], None)) 872 utils.logging.warning.expect_any_call() 873 874 self.assertFalse( 875 utils._wait_for_commands([bg_job], time.time(), None)) 876 877 878class test_compare_versions(unittest.TestCase): 879 def test_zerofill(self): 880 self.assertEqual(utils.compare_versions('1.7', '1.10'), -1) 881 self.assertEqual(utils.compare_versions('1.222', '1.3'), 1) 882 self.assertEqual(utils.compare_versions('1.03', '1.3'), 0) 883 884 885 def test_unequal_len(self): 886 self.assertEqual(utils.compare_versions('1.3', '1.3.4'), -1) 887 self.assertEqual(utils.compare_versions('1.3.1', '1.3'), 1) 888 889 890 def test_dash_delimited(self): 891 self.assertEqual(utils.compare_versions('1-2-3', '1-5-1'), -1) 892 self.assertEqual(utils.compare_versions('1-2-1', '1-1-1'), 1) 893 self.assertEqual(utils.compare_versions('1-2-4', '1-2-4'), 0) 894 895 896 def test_alphabets(self): 897 self.assertEqual(utils.compare_versions('m.l.b', 'n.b.a'), -1) 898 self.assertEqual(utils.compare_versions('n.b.a', 'm.l.b'), 1) 899 self.assertEqual(utils.compare_versions('abc.e', 'abc.e'), 0) 900 901 902 def test_mix_symbols(self): 903 self.assertEqual(utils.compare_versions('k-320.1', 'k-320.3'), -1) 904 self.assertEqual(utils.compare_versions('k-231.5', 'k-231.1'), 1) 905 self.assertEqual(utils.compare_versions('k-231.1', 'k-231.1'), 0) 906 907 self.assertEqual(utils.compare_versions('k.320-1', 'k.320-3'), -1) 908 self.assertEqual(utils.compare_versions('k.231-5', 'k.231-1'), 1) 909 self.assertEqual(utils.compare_versions('k.231-1', 'k.231-1'), 0) 910 911 912class test_args_to_dict(unittest.TestCase): 913 def test_no_args(self): 914 result = utils.args_to_dict([]) 915 self.assertEqual({}, result) 916 917 918 def test_matches(self): 919 result = utils.args_to_dict(['aBc:DeF', 'SyS=DEf', 'XY_Z:', 920 'F__o0O=', 'B8r:=:=', '_bAZ_=:=:']) 921 self.assertEqual(result, {'abc':'DeF', 'sys':'DEf', 'xy_z':'', 922 'f__o0o':'', 'b8r':'=:=', '_baz_':':=:'}) 923 924 925 def test_unmatches(self): 926 # Temporarily shut warning messages from args_to_dict() when an argument 927 # doesn't match its pattern. 928 logger = logging.getLogger() 929 saved_level = logger.level 930 logger.setLevel(logging.ERROR) 931 932 try: 933 result = utils.args_to_dict(['ab-c:DeF', '--SyS=DEf', 'a*=b', 'a*b', 934 ':VAL', '=VVV', 'WORD']) 935 self.assertEqual({}, result) 936 finally: 937 # Restore level. 938 logger.setLevel(saved_level) 939 940 941class test_get_random_port(unittest.TestCase): 942 def do_bind(self, port, socket_type, socket_proto): 943 s = socket.socket(socket.AF_INET, socket_type, socket_proto) 944 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 945 s.bind(('', port)) 946 return s 947 948 949 def test_get_port(self): 950 for _ in range(100): 951 p = utils.get_unused_port() 952 s = self.do_bind(p, socket.SOCK_STREAM, socket.IPPROTO_TCP) 953 self.assert_(s.getsockname()) 954 s = self.do_bind(p, socket.SOCK_DGRAM, socket.IPPROTO_UDP) 955 self.assert_(s.getsockname()) 956 957 958def test_function(arg1, arg2, arg3, arg4=4, arg5=5, arg6=6): 959 """Test global function. 960 """ 961 962 963class TestClass(object): 964 """Test class. 965 """ 966 967 def test_instance_function(self, arg1, arg2, arg3, arg4=4, arg5=5, arg6=6): 968 """Test instance function. 969 """ 970 971 972 @classmethod 973 def test_class_function(cls, arg1, arg2, arg3, arg4=4, arg5=5, arg6=6): 974 """Test class function. 975 """ 976 977 978 @staticmethod 979 def test_static_function(arg1, arg2, arg3, arg4=4, arg5=5, arg6=6): 980 """Test static function. 981 """ 982 983 984class GetFunctionArgUnittest(unittest.TestCase): 985 """Tests for method get_function_arg_value.""" 986 987 def run_test(self, func, insert_arg): 988 """Run test. 989 990 @param func: Function being called with given arguments. 991 @param insert_arg: Set to True to insert an object in the argument list. 992 This is to mock instance/class object. 993 """ 994 if insert_arg: 995 args = (None, 1, 2, 3) 996 else: 997 args = (1, 2, 3) 998 for i in range(1, 7): 999 self.assertEquals(utils.get_function_arg_value( 1000 func, 'arg%d'%i, args, {}), i) 1001 1002 self.assertEquals(utils.get_function_arg_value( 1003 func, 'arg7', args, {'arg7': 7}), 7) 1004 self.assertRaises( 1005 KeyError, utils.get_function_arg_value, 1006 func, 'arg3', args[:-1], {}) 1007 1008 1009 def test_global_function(self): 1010 """Test global function. 1011 """ 1012 self.run_test(test_function, False) 1013 1014 1015 def test_instance_function(self): 1016 """Test instance function. 1017 """ 1018 self.run_test(TestClass().test_instance_function, True) 1019 1020 1021 def test_class_function(self): 1022 """Test class function. 1023 """ 1024 self.run_test(TestClass.test_class_function, True) 1025 1026 1027 def test_static_function(self): 1028 """Test static function. 1029 """ 1030 self.run_test(TestClass.test_static_function, False) 1031 1032 1033class IsInSameSubnetUnittest(unittest.TestCase): 1034 """Test is_in_same_subnet function.""" 1035 1036 def test_is_in_same_subnet(self): 1037 """Test is_in_same_subnet function.""" 1038 self.assertTrue(utils.is_in_same_subnet('192.168.0.0', '192.168.1.2', 1039 23)) 1040 self.assertFalse(utils.is_in_same_subnet('192.168.0.0', '192.168.1.2', 1041 24)) 1042 self.assertTrue(utils.is_in_same_subnet('192.168.0.0', '192.168.0.255', 1043 24)) 1044 self.assertFalse(utils.is_in_same_subnet('191.168.0.0', '192.168.0.0', 1045 24)) 1046 1047 1048class GetWirelessSsidUnittest(unittest.TestCase): 1049 """Test get_wireless_ssid function.""" 1050 1051 DEFAULT_SSID = 'default' 1052 SSID_1 = 'ssid_1' 1053 SSID_2 = 'ssid_2' 1054 SSID_3 = 'ssid_3' 1055 1056 def test_get_wireless_ssid(self): 1057 """Test is_in_same_subnet function.""" 1058 god = mock.mock_god() 1059 god.stub_function_to_return(utils.CONFIG, 'get_config_value', 1060 self.DEFAULT_SSID) 1061 god.stub_function_to_return(utils.CONFIG, 'get_config_value_regex', 1062 {'wireless_ssid_1.2.3.4/24': self.SSID_1, 1063 'wireless_ssid_4.3.2.1/16': self.SSID_2, 1064 'wireless_ssid_4.3.2.111/32': self.SSID_3}) 1065 self.assertEqual(self.SSID_1, utils.get_wireless_ssid('1.2.3.100')) 1066 self.assertEqual(self.SSID_2, utils.get_wireless_ssid('4.3.2.100')) 1067 self.assertEqual(self.SSID_3, utils.get_wireless_ssid('4.3.2.111')) 1068 self.assertEqual(self.DEFAULT_SSID, 1069 utils.get_wireless_ssid('100.0.0.100')) 1070 1071 1072class LaunchControlBuildParseUnittest(unittest.TestCase): 1073 """Test various parsing functions related to Launch Control builds and 1074 devices. 1075 """ 1076 1077 def test_parse_launch_control_target(self): 1078 """Test parse_launch_control_target function.""" 1079 target_tests = { 1080 ('shamu', 'userdebug'): 'shamu-userdebug', 1081 ('shamu', 'eng'): 'shamu-eng', 1082 ('shamu-board', 'eng'): 'shamu-board-eng', 1083 (None, None): 'bad_target', 1084 (None, None): 'target'} 1085 for result, target in target_tests.items(): 1086 self.assertEqual(result, utils.parse_launch_control_target(target)) 1087 1088 1089class GetOffloaderUriTest(unittest.TestCase): 1090 """Test get_offload_gsuri function.""" 1091 _IMAGE_STORAGE_SERVER = 'gs://test_image_bucket' 1092 1093 def setUp(self): 1094 self.god = mock.mock_god() 1095 1096 def tearDown(self): 1097 self.god.unstub_all() 1098 1099 def test_get_default_lab_offload_gsuri(self): 1100 """Test default lab offload gsuri .""" 1101 self.god.mock_up(utils.CONFIG, 'CONFIG') 1102 self.god.stub_function_to_return(utils, 'is_moblab', False) 1103 self.assertEqual(utils.DEFAULT_OFFLOAD_GSURI, 1104 utils.get_offload_gsuri()) 1105 1106 self.god.check_playback() 1107 1108 def test_get_default_moblab_offload_gsuri(self): 1109 self.god.mock_up(utils.CONFIG, 'CONFIG') 1110 self.god.stub_function_to_return(utils, 'is_moblab', True) 1111 utils.CONFIG.get_config_value.expect_call( 1112 'CROS', 'image_storage_server').and_return( 1113 self._IMAGE_STORAGE_SERVER) 1114 self.god.stub_function_to_return(utils, 1115 'get_moblab_serial_number', 'test_serial_number') 1116 self.god.stub_function_to_return(utils, 'get_moblab_id', 'test_id') 1117 expected_gsuri = '%sresults/%s/%s/' % ( 1118 self._IMAGE_STORAGE_SERVER, 'test_serial_number', 'test_id') 1119 cached_gsuri = utils.DEFAULT_OFFLOAD_GSURI 1120 utils.DEFAULT_OFFLOAD_GSURI = None 1121 gsuri = utils.get_offload_gsuri() 1122 utils.DEFAULT_OFFLOAD_GSURI = cached_gsuri 1123 self.assertEqual(expected_gsuri, gsuri) 1124 1125 self.god.check_playback() 1126 1127 def test_get_moblab_offload_gsuri(self): 1128 """Test default lab offload gsuri .""" 1129 self.god.mock_up(utils.CONFIG, 'CONFIG') 1130 self.god.stub_function_to_return(utils, 'is_moblab', True) 1131 self.god.stub_function_to_return(utils, 1132 'get_moblab_serial_number', 'test_serial_number') 1133 self.god.stub_function_to_return(utils, 'get_moblab_id', 'test_id') 1134 gsuri = '%s%s/%s/' % ( 1135 utils.DEFAULT_OFFLOAD_GSURI, 'test_serial_number', 'test_id') 1136 self.assertEqual(gsuri, utils.get_offload_gsuri()) 1137 1138 self.god.check_playback() 1139 1140 1141 1142class MockMetricsTest(unittest.TestCase): 1143 """Test metrics mock class can handle various metrics calls.""" 1144 1145 def test_Counter(self): 1146 """Test the mock class can create an instance and call any method. 1147 """ 1148 c = metrics.Counter('counter') 1149 c.increment(fields={'key': 1}) 1150 1151 1152 def test_Context(self): 1153 """Test the mock class can handle context class. 1154 """ 1155 test_value = None 1156 with metrics.SecondsTimer('context') as t: 1157 test_value = 'called_in_context' 1158 t['random_key'] = 'pass' 1159 self.assertEqual('called_in_context', test_value) 1160 1161 1162 def test_decorator(self): 1163 """Test the mock class can handle decorator. 1164 """ 1165 class TestClass(object): 1166 1167 def __init__(self): 1168 self.value = None 1169 1170 test_value = TestClass() 1171 test_value.value = None 1172 @metrics.SecondsTimerDecorator('decorator') 1173 def test(arg): 1174 arg.value = 'called_in_decorator' 1175 1176 test(test_value) 1177 self.assertEqual('called_in_decorator', test_value.value) 1178 1179 1180 def test_setitem(self): 1181 """Test the mock class can handle set item call. 1182 """ 1183 timer = metrics.SecondsTimer('name') 1184 timer['random_key'] = 'pass' 1185 1186 1187class test_background_sample(unittest.TestCase): 1188 """Test that the background sample can sample as desired. 1189 """ 1190 1191 def test_can_sample(self): 1192 """Test that a simple sample will work with no other complications. 1193 """ 1194 should_be_sampled = 'name' 1195 1196 def sample_function(): 1197 """Return value of variable stored in method.""" 1198 return should_be_sampled 1199 still_sampling = True 1200 1201 t = utils.background_sample_until_condition( 1202 function=sample_function, 1203 condition=lambda: still_sampling, 1204 timeout=5, 1205 sleep_interval=0.1) 1206 result = t.finish() 1207 self.assertIn(should_be_sampled, result) 1208 1209 1210 def test_samples_multiple_values(self): 1211 """Test that a sample will work and actually samples at the necessary 1212 intervals, such that it will pick up changes. 1213 """ 1214 should_be_sampled = 'name' 1215 1216 def sample_function(): 1217 """Return value of variable stored in method.""" 1218 return should_be_sampled 1219 still_sampling = True 1220 1221 t = utils.background_sample_until_condition( 1222 function=sample_function, 1223 condition=lambda: still_sampling, 1224 timeout=5, 1225 sleep_interval=0.1) 1226 # Let it sample values some with the initial value. 1227 time.sleep(2.5) 1228 # It should also sample some with the new value. 1229 should_be_sampled = 'noname' 1230 result = t.finish() 1231 self.assertIn('name', result) 1232 self.assertIn('noname', result) 1233 1234 1235class FakeTime(object): 1236 """Provides time() and sleep() for faking time module. 1237 """ 1238 1239 def __init__(self, start_time): 1240 self._time = start_time 1241 1242 1243 def time(self): 1244 return self._time 1245 1246 1247 def sleep(self, interval): 1248 self._time += interval 1249 1250 1251class TimeModuleMockTestCase(unittest.TestCase): 1252 """Mocks up utils.time with a FakeTime. 1253 1254 It substitudes time.time() and time.sleep() with FakeTime.time() 1255 and FakeTime.sleep(), respectively. 1256 """ 1257 1258 def setUp(self): 1259 self.fake_time_begin = 10 1260 self.fake_time = FakeTime(self.fake_time_begin) 1261 self.patcher = pymock.patch( 1262 'autotest_lib.client.common_lib.utils.time') 1263 self.time_mock = self.patcher.start() 1264 self.addCleanup(self.patcher.stop) 1265 self.time_mock.time.side_effect = self.fake_time.time 1266 self.time_mock.sleep.side_effect = self.fake_time.sleep 1267 1268 1269def always_raise(): 1270 """A function that raises an exception.""" 1271 raise Exception('always raise') 1272 1273 1274def fail_n_times(count): 1275 """Creates a function that returns False for the first count-th calls. 1276 1277 @return a function returns False for the first count-th calls and True 1278 afterwards. 1279 """ 1280 counter = itertools.count(count, -1) 1281 return lambda: next(counter) <= 0 1282 1283 1284class test_poll_for_condition(TimeModuleMockTestCase): 1285 """Test poll_for_condition. 1286 """ 1287 1288 def test_ok(self): 1289 """Test polling condition that returns True. 1290 """ 1291 self.assertTrue(utils.poll_for_condition(lambda: True)) 1292 1293 1294 def test_ok_evaluated_as_true(self): 1295 """Test polling condition which's return value is evaluated as True. 1296 """ 1297 self.assertEqual(1, utils.poll_for_condition(lambda: 1)) 1298 1299 self.assertEqual('something', 1300 utils.poll_for_condition(lambda: 'something')) 1301 1302 1303 def test_fail(self): 1304 """Test polling condition that returns False. 1305 1306 Expect TimeoutError exception as neither customized exception nor 1307 exception raised from condition(). 1308 """ 1309 with self.assertRaises(utils.TimeoutError): 1310 utils.poll_for_condition(lambda: False, timeout=3, sleep_interval=1) 1311 self.assertEqual(3, self.time_mock.sleep.call_count) 1312 1313 1314 def test_fail_evaluated_as_false(self): 1315 """Test polling condition which's return value is evaluated as False. 1316 1317 Expect TimeoutError exception as neither customized exception nor 1318 exception raised from condition(). 1319 """ 1320 with self.assertRaises(utils.TimeoutError): 1321 utils.poll_for_condition(lambda: 0, timeout=3, sleep_interval=1) 1322 self.assertEqual(3, self.time_mock.sleep.call_count) 1323 1324 with self.assertRaises(utils.TimeoutError): 1325 utils.poll_for_condition(lambda: None, timeout=3, sleep_interval=1) 1326 1327 1328 def test_exception_arg(self): 1329 """Test polling condition always fails. 1330 1331 Expect exception raised by 'exception' args. 1332 """ 1333 with self.assertRaisesRegexp(Exception, 'from args'): 1334 utils.poll_for_condition(lambda: False, 1335 exception=Exception('from args'), 1336 timeout=3, sleep_interval=1) 1337 self.assertEqual(3, self.time_mock.sleep.call_count) 1338 1339 1340 def test_exception_from_condition(self): 1341 """Test polling condition always fails. 1342 1343 Expect exception raised by condition(). 1344 """ 1345 with self.assertRaisesRegexp(Exception, 'always raise'): 1346 utils.poll_for_condition(always_raise, 1347 exception=Exception('from args'), 1348 timeout=3, sleep_interval=1) 1349 # For poll_for_condition, if condition() raises exception, it raises 1350 # immidiately without retry. So sleep() should not be called. 1351 self.time_mock.sleep.assert_not_called() 1352 1353 1354 def test_ok_after_retry(self): 1355 """Test polling a condition which is success after retry twice. 1356 """ 1357 self.assertTrue(utils.poll_for_condition(fail_n_times(2), timeout=3, 1358 sleep_interval=1)) 1359 1360 1361 def test_cannot_wait(self): 1362 """Test polling a condition which fails till timeout. 1363 """ 1364 with self.assertRaisesRegexp( 1365 utils.TimeoutError, 1366 'Timed out waiting for unnamed condition'): 1367 utils.poll_for_condition(fail_n_times(4), timeout=3, 1368 sleep_interval=1) 1369 self.assertEqual(3, self.time_mock.sleep.call_count) 1370 1371 1372class test_poll_for_condition_ex(TimeModuleMockTestCase): 1373 """Test poll_for_condition_ex. 1374 """ 1375 1376 def test_ok(self): 1377 """Test polling condition that returns True. 1378 """ 1379 self.assertTrue(utils.poll_for_condition_ex(lambda: True)) 1380 1381 1382 def test_ok_evaluated_as_true(self): 1383 """Test polling condition which's return value is evaluated as True. 1384 """ 1385 self.assertEqual(1, utils.poll_for_condition_ex(lambda: 1)) 1386 1387 self.assertEqual('something', 1388 utils.poll_for_condition_ex(lambda: 'something')) 1389 1390 1391 def test_fail(self): 1392 """Test polling condition that returns False. 1393 1394 Expect TimeoutError raised. 1395 """ 1396 with self.assertRaisesRegexp( 1397 utils.TimeoutError, 1398 'Timed out waiting for unamed condition'): 1399 utils.poll_for_condition_ex(lambda: False, timeout=3, 1400 sleep_interval=1) 1401 self.assertEqual(2, self.time_mock.sleep.call_count) 1402 1403 1404 def test_fail_evaluated_as_false(self): 1405 """Test polling condition which's return value is evaluated as False. 1406 1407 Expect TimeoutError raised. 1408 """ 1409 with self.assertRaisesRegexp( 1410 utils.TimeoutError, 1411 'Timed out waiting for unamed condition'): 1412 utils.poll_for_condition_ex(lambda: 0, timeout=3, 1413 sleep_interval=1) 1414 self.assertEqual(2, self.time_mock.sleep.call_count) 1415 1416 with self.assertRaisesRegexp( 1417 utils.TimeoutError, 1418 'Timed out waiting for unamed condition'): 1419 utils.poll_for_condition_ex(lambda: None, timeout=3, 1420 sleep_interval=1) 1421 1422 1423 def test_desc_arg(self): 1424 """Test polling condition always fails with desc. 1425 1426 Expect TimeoutError with condition description embedded. 1427 """ 1428 with self.assertRaisesRegexp( 1429 utils.TimeoutError, 1430 'Timed out waiting for always false condition'): 1431 utils.poll_for_condition_ex(lambda: False, 1432 desc='always false condition', 1433 timeout=3, sleep_interval=1) 1434 self.assertEqual(2, self.time_mock.sleep.call_count) 1435 1436 1437 def test_exception(self): 1438 """Test polling condition that raises. 1439 1440 Expect TimeoutError with condition raised exception embedded. 1441 """ 1442 with self.assertRaisesRegexp( 1443 utils.TimeoutError, 1444 "Reason: Exception\('always raise',\)"): 1445 utils.poll_for_condition_ex(always_raise, timeout=3, 1446 sleep_interval=1) 1447 self.assertEqual(2, self.time_mock.sleep.call_count) 1448 1449 1450 def test_ok_after_retry(self): 1451 """Test polling a condition which is success after retry twice. 1452 """ 1453 self.assertTrue(utils.poll_for_condition_ex(fail_n_times(2), timeout=3, 1454 sleep_interval=1)) 1455 1456 1457 def test_cannot_wait(self): 1458 """Test polling a condition which fails till timeout. 1459 """ 1460 with self.assertRaisesRegexp( 1461 utils.TimeoutError, 1462 'condition evaluted as false'): 1463 utils.poll_for_condition_ex(fail_n_times(3), timeout=3, 1464 sleep_interval=1) 1465 self.assertEqual(2, self.time_mock.sleep.call_count) 1466 1467 1468class test_timer(TimeModuleMockTestCase): 1469 """Test Timer. 1470 """ 1471 1472 def test_zero_timeout(self): 1473 """Test Timer with zero timeout. 1474 1475 Only the first timer.sleep(0) is True. 1476 """ 1477 timer = utils.Timer(0) 1478 self.assertTrue(timer.sleep(0)) 1479 self.assertFalse(timer.sleep(0)) 1480 self.time_mock.sleep.assert_not_called() 1481 1482 1483 def test_sleep(self): 1484 """Test Timer.sleep() 1485 """ 1486 timeout = 3 1487 sleep_interval = 2 1488 timer = utils.Timer(timeout) 1489 1490 # Kicks off timer. 1491 self.assertTrue(timer.sleep(sleep_interval)) 1492 self.assertEqual(self.fake_time_begin + timeout, timer.deadline) 1493 self.assertTrue(timer.sleep(sleep_interval)) 1494 # now: 12. 12 + 2 > 13, unable to sleep 1495 self.assertFalse(timer.sleep(sleep_interval)) 1496 1497 self.time_mock.sleep.assert_has_calls([pymock.call(sleep_interval)]) 1498 1499 1500class test_timeout_error(unittest.TestCase): 1501 """Test TimeoutError. 1502 1503 Test TimeoutError with three invocations format. 1504 """ 1505 1506 def test_no_args(self): 1507 """Create TimeoutError without arguments. 1508 """ 1509 e = utils.TimeoutError() 1510 self.assertEqual('', str(e)) 1511 self.assertEqual('TimeoutError()', repr(e)) 1512 1513 1514 def test_with_message(self): 1515 """Create TimeoutError with text message. 1516 """ 1517 e = utils.TimeoutError(message='Waiting for condition') 1518 self.assertEqual('Waiting for condition', str(e)) 1519 self.assertEqual("TimeoutError('Waiting for condition',)", repr(e)) 1520 1521 # Positional message argument for backward compatibility. 1522 e = utils.TimeoutError('Waiting for condition') 1523 self.assertEqual('Waiting for condition', str(e)) 1524 self.assertEqual("TimeoutError('Waiting for condition',)", repr(e)) 1525 1526 1527 1528 def test_with_reason(self): 1529 """Create TimeoutError with reason only. 1530 """ 1531 e = utils.TimeoutError(reason='illegal input') 1532 self.assertEqual("Reason: 'illegal input'", str(e)) 1533 self.assertEqual("TimeoutError(\"Reason: 'illegal input'\",)", repr(e)) 1534 self.assertEqual('illegal input', e.reason) 1535 1536 1537 def test_with_message_reason(self): 1538 """Create TimeoutError with text message and reason. 1539 """ 1540 e = utils.TimeoutError(message='Waiting for condition', 1541 reason='illegal input') 1542 self.assertEqual("Waiting for condition. Reason: 'illegal input'", 1543 str(e)) 1544 self.assertEqual('illegal input', e.reason) 1545 1546 # Positional message argument for backward compatibility. 1547 e = utils.TimeoutError('Waiting for condition', reason='illegal input') 1548 self.assertEqual("Waiting for condition. Reason: 'illegal input'", 1549 str(e)) 1550 self.assertEqual('illegal input', e.reason) 1551 1552 1553 def test_with_message_reason_object(self): 1554 """Create TimeoutError with text message and reason as exception object. 1555 """ 1556 e = utils.TimeoutError(message='Waiting for condition', 1557 reason=Exception('illegal input')) 1558 self.assertEqual( 1559 "Waiting for condition. Reason: Exception('illegal input',)", 1560 str(e)) 1561 self.assertIsInstance(e.reason, Exception) 1562 self.assertEqual('illegal input', str(e.reason)) 1563 1564 # Positional message argument for backward compatibility. 1565 e = utils.TimeoutError('Waiting for condition', 1566 reason=Exception('illegal input')) 1567 self.assertEqual( 1568 "Waiting for condition. Reason: Exception('illegal input',)", 1569 str(e)) 1570 self.assertIsInstance(e.reason, Exception) 1571 self.assertEqual('illegal input', str(e.reason)) 1572 1573 1574 1575if __name__ == "__main__": 1576 unittest.main() 1577