xref: /aosp_15_r20/external/autotest/client/common_lib/utils_unittest.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1#!/usr/bin/python3
2
3# pylint: disable=missing-docstring
4
5from __future__ import absolute_import, division, print_function
6
7import errno
8import io
9import itertools
10import logging
11import os
12import select
13import socket
14import subprocess
15import time
16import unittest
17
18# mock 1.0.0 (in site-packages/chromite/third_party/mock.py)
19# which is an ancestor of Python's default library starting from Python 3.3.
20# See https://docs.python.org/3/library/unittest.mock.html
21import mock as pymock
22import six
23from six.moves import range, urllib
24
25import common
26
27from autotest_lib.client.common_lib import autotemp, utils
28from autotest_lib.client.common_lib.test_utils import mock
29
30metrics = utils.metrics_mock
31
32
33class test_read_one_line(unittest.TestCase):
34    def setUp(self):
35        self.god = mock.mock_god(ut=self)
36        self.god.stub_function(utils, "open")
37
38
39    def tearDown(self):
40        self.god.unstub_all()
41
42
43    def test_ip_to_long(self):
44        self.assertEqual(utils.ip_to_long('0.0.0.0'), 0)
45        self.assertEqual(utils.ip_to_long('255.255.255.255'), 4294967295)
46        self.assertEqual(utils.ip_to_long('192.168.0.1'), 3232235521)
47        self.assertEqual(utils.ip_to_long('1.2.4.8'), 16909320)
48
49
50    def test_long_to_ip(self):
51        self.assertEqual(utils.long_to_ip(0), '0.0.0.0')
52        self.assertEqual(utils.long_to_ip(4294967295), '255.255.255.255')
53        self.assertEqual(utils.long_to_ip(3232235521), '192.168.0.1')
54        self.assertEqual(utils.long_to_ip(16909320), '1.2.4.8')
55
56
57    def test_create_subnet_mask(self):
58        self.assertEqual(utils.create_subnet_mask(0), 0)
59        self.assertEqual(utils.create_subnet_mask(32), 4294967295)
60        self.assertEqual(utils.create_subnet_mask(25), 4294967168)
61
62
63    def test_format_ip_with_mask(self):
64        self.assertEqual(utils.format_ip_with_mask('192.168.0.1', 0),
65                         '0.0.0.0/0')
66        self.assertEqual(utils.format_ip_with_mask('192.168.0.1', 32),
67                         '192.168.0.1/32')
68        self.assertEqual(utils.format_ip_with_mask('192.168.0.1', 26),
69                         '192.168.0.0/26')
70        self.assertEqual(utils.format_ip_with_mask('192.168.0.255', 26),
71                         '192.168.0.192/26')
72
73
74    def create_test_file(self, contents):
75        test_file = six.StringIO(contents)
76        utils.open.expect_call("filename", "r").and_return(test_file)
77
78
79    def test_reads_one_line_file(self):
80        self.create_test_file("abc\n")
81        self.assertEqual("abc", utils.read_one_line("filename"))
82        self.god.check_playback()
83
84
85    def test_strips_read_lines(self):
86        self.create_test_file("abc   \n")
87        self.assertEqual("abc   ", utils.read_one_line("filename"))
88        self.god.check_playback()
89
90
91    def test_drops_extra_lines(self):
92        self.create_test_file("line 1\nline 2\nline 3\n")
93        self.assertEqual("line 1", utils.read_one_line("filename"))
94        self.god.check_playback()
95
96
97    def test_works_on_empty_file(self):
98        self.create_test_file("")
99        self.assertEqual("", utils.read_one_line("filename"))
100        self.god.check_playback()
101
102
103    def test_works_on_file_with_no_newlines(self):
104        self.create_test_file("line but no newline")
105        self.assertEqual("line but no newline",
106                         utils.read_one_line("filename"))
107        self.god.check_playback()
108
109
110    def test_preserves_leading_whitespace(self):
111        self.create_test_file("   has leading whitespace")
112        self.assertEqual("   has leading whitespace",
113                         utils.read_one_line("filename"))
114
115
116class test_write_one_line(unittest.TestCase):
117    def setUp(self):
118        self.god = mock.mock_god(ut=self)
119        self.god.stub_function(utils, "open")
120
121
122    def tearDown(self):
123        self.god.unstub_all()
124
125
126    def get_write_one_line_output(self, content):
127        test_file = mock.SaveDataAfterCloseStringIO()
128        utils.open.expect_call("filename", "w").and_return(test_file)
129        utils.write_one_line("filename", content)
130        self.god.check_playback()
131        return test_file.final_data
132
133
134    def test_writes_one_line_file(self):
135        self.assertEqual("abc\n", self.get_write_one_line_output("abc"))
136
137
138    def test_preserves_existing_newline(self):
139        self.assertEqual("abc\n", self.get_write_one_line_output("abc\n"))
140
141
142    def test_preserves_leading_whitespace(self):
143        self.assertEqual("   abc\n", self.get_write_one_line_output("   abc"))
144
145
146    def test_preserves_trailing_whitespace(self):
147        self.assertEqual("abc   \n", self.get_write_one_line_output("abc   "))
148
149
150    def test_handles_empty_input(self):
151        self.assertEqual("\n", self.get_write_one_line_output(""))
152
153
154class test_open_write_close(unittest.TestCase):
155    def setUp(self):
156        self.god = mock.mock_god(ut=self)
157        self.god.stub_function(utils, "open")
158
159
160    def tearDown(self):
161        self.god.unstub_all()
162
163
164    def test_simple_functionality(self):
165        data = "\n\nwhee\n"
166        test_file = mock.SaveDataAfterCloseStringIO()
167        utils.open.expect_call("filename", "w").and_return(test_file)
168        utils.open_write_close("filename", data)
169        self.god.check_playback()
170        self.assertEqual(data, test_file.final_data)
171
172    def test_binary_functionality(self):
173        data = bytearray([0, 1, 3, 23, 0, 71, 254, 255, 127, 128])
174        test_file = mock.SaveDataAfterCloseBytesIO()
175        utils.open.expect_call("filename", "wb").and_return(test_file)
176        utils.open_write_close("filename", data, is_binary=True)
177        self.god.check_playback()
178        self.assertEqual(data, test_file.final_data)
179
180
181class test_read_keyval(unittest.TestCase):
182    def setUp(self):
183        self.god = mock.mock_god(ut=self)
184        self.god.stub_function(utils, "open")
185        self.god.stub_function(os.path, "isdir")
186        self.god.stub_function(os.path, "exists")
187
188
189    def tearDown(self):
190        self.god.unstub_all()
191
192
193    def create_test_file(self, filename, contents):
194        test_file = six.StringIO(contents)
195        os.path.exists.expect_call(filename).and_return(True)
196        utils.open.expect_call(filename).and_return(test_file)
197
198
199    def read_keyval(self, contents):
200        os.path.isdir.expect_call("file").and_return(False)
201        self.create_test_file("file", contents)
202        keyval = utils.read_keyval("file")
203        self.god.check_playback()
204        return keyval
205
206
207    def test_returns_empty_when_file_doesnt_exist(self):
208        os.path.isdir.expect_call("file").and_return(False)
209        os.path.exists.expect_call("file").and_return(False)
210        self.assertEqual({}, utils.read_keyval("file"))
211        self.god.check_playback()
212
213
214    def test_accesses_files_directly(self):
215        os.path.isdir.expect_call("file").and_return(False)
216        self.create_test_file("file", "")
217        utils.read_keyval("file")
218        self.god.check_playback()
219
220
221    def test_accesses_directories_through_keyval_file(self):
222        os.path.isdir.expect_call("dir").and_return(True)
223        self.create_test_file("dir/keyval", "")
224        utils.read_keyval("dir")
225        self.god.check_playback()
226
227
228    def test_values_are_rstripped(self):
229        keyval = self.read_keyval("a=b   \n")
230        self.assertEquals(keyval, {"a": "b"})
231
232
233    def test_comments_are_ignored(self):
234        keyval = self.read_keyval("a=b # a comment\n")
235        self.assertEquals(keyval, {"a": "b"})
236
237
238    def test_integers_become_ints(self):
239        keyval = self.read_keyval("a=1\n")
240        self.assertEquals(keyval, {"a": 1})
241        self.assertEquals(int, type(keyval["a"]))
242
243
244    def test_float_values_become_floats(self):
245        keyval = self.read_keyval("a=1.5\n")
246        self.assertEquals(keyval, {"a": 1.5})
247        self.assertEquals(float, type(keyval["a"]))
248
249
250    def test_multiple_lines(self):
251        keyval = self.read_keyval("a=one\nb=two\n")
252        self.assertEquals(keyval, {"a": "one", "b": "two"})
253
254
255    def test_the_last_duplicate_line_is_used(self):
256        keyval = self.read_keyval("a=one\nb=two\na=three\n")
257        self.assertEquals(keyval, {"a": "three", "b": "two"})
258
259
260    def test_extra_equals_are_included_in_values(self):
261        keyval = self.read_keyval("a=b=c\n")
262        self.assertEquals(keyval, {"a": "b=c"})
263
264
265    def test_non_alphanumeric_keynames_are_rejected(self):
266        self.assertRaises(ValueError, self.read_keyval, "a$=one\n")
267
268
269    def test_underscores_are_allowed_in_key_names(self):
270        keyval = self.read_keyval("a_b=value\n")
271        self.assertEquals(keyval, {"a_b": "value"})
272
273
274    def test_dashes_are_allowed_in_key_names(self):
275        keyval = self.read_keyval("a-b=value\n")
276        self.assertEquals(keyval, {"a-b": "value"})
277
278    def test_empty_value_is_allowed(self):
279        keyval = self.read_keyval("a=\n")
280        self.assertEquals(keyval, {"a": ""})
281
282
283class test_write_keyval(unittest.TestCase):
284    def setUp(self):
285        self.god = mock.mock_god(ut=self)
286        self.god.stub_function(utils, "open")
287        self.god.stub_function(os.path, "isdir")
288
289
290    def tearDown(self):
291        self.god.unstub_all()
292
293
294    def assertHasLines(self, value, lines):
295        vlines = value.splitlines()
296        vlines.sort()
297        self.assertEquals(vlines, sorted(lines))
298
299
300    def write_keyval(self, filename, dictionary, expected_filename=None,
301                     type_tag=None):
302        if expected_filename is None:
303            expected_filename = filename
304        test_file = six.StringIO()
305        self.god.stub_function(test_file, "close")
306        utils.open.expect_call(expected_filename, "a").and_return(test_file)
307        test_file.close.expect_call()
308        if type_tag is None:
309            utils.write_keyval(filename, dictionary)
310        else:
311            utils.write_keyval(filename, dictionary, type_tag)
312        return test_file.getvalue()
313
314
315    def write_keyval_file(self, dictionary, type_tag=None):
316        os.path.isdir.expect_call("file").and_return(False)
317        return self.write_keyval("file", dictionary, type_tag=type_tag)
318
319
320    def test_accesses_files_directly(self):
321        os.path.isdir.expect_call("file").and_return(False)
322        result = self.write_keyval("file", {"a": "1"})
323        self.assertEquals(result, "a=1\n")
324
325
326    def test_accesses_directories_through_keyval_file(self):
327        os.path.isdir.expect_call("dir").and_return(True)
328        result = self.write_keyval("dir", {"b": "2"}, "dir/keyval")
329        self.assertEquals(result, "b=2\n")
330
331
332    def test_numbers_are_stringified(self):
333        result = self.write_keyval_file({"c": 3})
334        self.assertEquals(result, "c=3\n")
335
336
337    def test_type_tags_are_excluded_by_default(self):
338        result = self.write_keyval_file({"d": "a string"})
339        self.assertEquals(result, "d=a string\n")
340        self.assertRaises(ValueError, self.write_keyval_file,
341                          {"d{perf}": "a string"})
342
343
344    def test_perf_tags_are_allowed(self):
345        result = self.write_keyval_file({"a{perf}": 1, "b{perf}": 2},
346                                        type_tag="perf")
347        self.assertHasLines(result, ["a{perf}=1", "b{perf}=2"])
348        self.assertRaises(ValueError, self.write_keyval_file,
349                          {"a": 1, "b": 2}, type_tag="perf")
350
351
352    def test_non_alphanumeric_keynames_are_rejected(self):
353        self.assertRaises(ValueError, self.write_keyval_file, {"x$": 0})
354
355
356    def test_underscores_are_allowed_in_key_names(self):
357        result = self.write_keyval_file({"a_b": "value"})
358        self.assertEquals(result, "a_b=value\n")
359
360
361    def test_dashes_are_allowed_in_key_names(self):
362        result = self.write_keyval_file({"a-b": "value"})
363        self.assertEquals(result, "a-b=value\n")
364
365
366class test_is_url(unittest.TestCase):
367    def test_accepts_http(self):
368        self.assertTrue(utils.is_url("http://example.com"))
369
370
371    def test_accepts_ftp(self):
372        self.assertTrue(utils.is_url("ftp://ftp.example.com"))
373
374
375    def test_rejects_local_path(self):
376        self.assertFalse(utils.is_url("/home/username/file"))
377
378
379    def test_rejects_local_filename(self):
380        self.assertFalse(utils.is_url("filename"))
381
382
383    def test_rejects_relative_local_path(self):
384        self.assertFalse(utils.is_url("somedir/somesubdir/file"))
385
386
387    def test_rejects_local_path_containing_url(self):
388        self.assertFalse(utils.is_url("somedir/http://path/file"))
389
390
391class test_urlopen(unittest.TestCase):
392    def setUp(self):
393        self.god = mock.mock_god(ut=self)
394
395
396    def tearDown(self):
397        self.god.unstub_all()
398
399
400    def stub_urlopen_with_timeout_comparison(self, test_func, expected_return,
401                                             *expected_args):
402        expected_args += (None,) * (2 - len(expected_args))
403        def urlopen(url, data=None):
404            self.assertEquals(expected_args, (url,data))
405            test_func(socket.getdefaulttimeout())
406            return expected_return
407        self.god.stub_with(urllib.request, "urlopen", urlopen)
408
409
410    def stub_urlopen_with_timeout_check(self, expected_timeout,
411                                        expected_return, *expected_args):
412        def test_func(timeout):
413            self.assertEquals(timeout, expected_timeout)
414        self.stub_urlopen_with_timeout_comparison(test_func, expected_return,
415                                                  *expected_args)
416
417
418    def test_timeout_set_during_call(self):
419        self.stub_urlopen_with_timeout_check(30, "retval", "url")
420        retval = utils.urlopen("url", timeout=30)
421        self.assertEquals(retval, "retval")
422
423
424    def test_timeout_reset_after_call(self):
425        old_timeout = socket.getdefaulttimeout()
426        self.stub_urlopen_with_timeout_check(30, None, "url")
427        try:
428            socket.setdefaulttimeout(1234)
429            utils.urlopen("url", timeout=30)
430            self.assertEquals(1234, socket.getdefaulttimeout())
431        finally:
432            socket.setdefaulttimeout(old_timeout)
433
434
435    def test_timeout_set_by_default(self):
436        def test_func(timeout):
437            self.assertTrue(timeout is not None)
438        self.stub_urlopen_with_timeout_comparison(test_func, None, "url")
439        utils.urlopen("url")
440
441
442    def test_args_are_untouched(self):
443        self.stub_urlopen_with_timeout_check(30, None, "http://url",
444                                             "POST data")
445        utils.urlopen("http://url", timeout=30, data="POST data")
446
447
448class test_urlretrieve(unittest.TestCase):
449    def setUp(self):
450        self.god = mock.mock_god(ut=self)
451
452
453    def tearDown(self):
454        self.god.unstub_all()
455
456
457    def test_urlopen_passed_arguments(self):
458        self.god.stub_function(utils, "urlopen")
459        self.god.stub_function(utils.shutil, "copyfileobj")
460        self.god.stub_function(utils, "open")
461
462        url = "url"
463        dest = "somefile"
464        data = object()
465        timeout = 10
466
467        src_file = self.god.create_mock_class(io.IOBase, "file")
468        dest_file = self.god.create_mock_class(io.IOBase, "file")
469
470        (utils.urlopen.expect_call(url, data=data, timeout=timeout)
471                .and_return(src_file))
472        utils.open.expect_call(dest, "wb").and_return(dest_file)
473        utils.shutil.copyfileobj.expect_call(src_file, dest_file)
474        dest_file.close.expect_call()
475        src_file.close.expect_call()
476
477        utils.urlretrieve(url, dest, data=data, timeout=timeout)
478        self.god.check_playback()
479
480
481class test_merge_trees(unittest.TestCase):
482    # a some path-handling helper functions
483    def src(self, *path_segments):
484        return os.path.join(self.src_tree.name, *path_segments)
485
486
487    def dest(self, *path_segments):
488        return os.path.join(self.dest_tree.name, *path_segments)
489
490
491    def paths(self, *path_segments):
492        return self.src(*path_segments), self.dest(*path_segments)
493
494
495    def assertFileEqual(self, *path_segments):
496        src, dest = self.paths(*path_segments)
497        self.assertEqual(True, os.path.isfile(src))
498        self.assertEqual(True, os.path.isfile(dest))
499        self.assertEqual(os.path.getsize(src), os.path.getsize(dest))
500        self.assertEqual(open(src).read(), open(dest).read())
501
502
503    def assertFileContents(self, contents, *path_segments):
504        dest = self.dest(*path_segments)
505        self.assertEqual(True, os.path.isfile(dest))
506        self.assertEqual(os.path.getsize(dest), len(contents))
507        self.assertEqual(contents, open(dest).read())
508
509
510    def setUp(self):
511        self.src_tree = autotemp.tempdir(unique_id='utilsrc')
512        self.dest_tree = autotemp.tempdir(unique_id='utilsdest')
513
514        # empty subdirs
515        os.mkdir(self.src("empty"))
516        os.mkdir(self.dest("empty"))
517
518
519    def tearDown(self):
520        self.src_tree.clean()
521        self.dest_tree.clean()
522
523
524    def test_both_dont_exist(self):
525        utils.merge_trees(*self.paths("empty"))
526
527
528    def test_file_only_at_src(self):
529        with open(self.src("src_only"), "w") as wf:
530            print("line 1", file=wf)
531        utils.merge_trees(*self.paths("src_only"))
532        self.assertFileEqual("src_only")
533
534
535    def test_file_only_at_dest(self):
536        with open(self.dest("dest_only"), "w") as wf:
537            print("line 1", file=wf)
538        utils.merge_trees(*self.paths("dest_only"))
539        self.assertEqual(False, os.path.exists(self.src("dest_only")))
540        self.assertFileContents("line 1\n", "dest_only")
541
542
543    def test_file_at_both(self):
544        with open(self.dest("in_both"), "w") as wf1:
545            print("line 1", file=wf1)
546        with open(self.src("in_both"), "w") as wf2:
547            print("line 2", file=wf2)
548        utils.merge_trees(*self.paths("in_both"))
549        self.assertFileContents("line 1\nline 2\n", "in_both")
550
551
552    def test_directory_with_files_in_both(self):
553        with open(self.dest("in_both"), "w") as wf1:
554            print("line 1", file=wf1)
555        with open(self.src("in_both"), "w") as wf2:
556            print("line 3", file=wf2)
557        utils.merge_trees(*self.paths())
558        self.assertFileContents("line 1\nline 3\n", "in_both")
559
560
561    def test_directory_with_mix_of_files(self):
562        with open(self.dest("in_dest"), "w") as wf1:
563            print("dest line", file=wf1)
564        with open(self.src("in_src"), "w") as wf2:
565            print("src line", file=wf2)
566        utils.merge_trees(*self.paths())
567        self.assertFileContents("dest line\n", "in_dest")
568        self.assertFileContents("src line\n", "in_src")
569
570
571    def test_directory_with_subdirectories(self):
572        os.mkdir(self.src("src_subdir"))
573        with open(self.src("src_subdir", "subfile"), "w") as wf1:
574            print("subdir line", file=wf1)
575        os.mkdir(self.src("both_subdir"))
576        os.mkdir(self.dest("both_subdir"))
577        with open(self.src("both_subdir", "subfile"), "w") as wf2:
578            print("src line", file=wf2)
579        with open(self.dest("both_subdir", "subfile"), "w") as wf3:
580            print("dest line", file=wf3)
581        utils.merge_trees(*self.paths())
582        self.assertFileContents("subdir line\n", "src_subdir", "subfile")
583        self.assertFileContents("dest line\nsrc line\n", "both_subdir",
584                                "subfile")
585
586
587class test_get_relative_path(unittest.TestCase):
588    def test_not_absolute(self):
589        self.assertRaises(AssertionError, utils.get_relative_path, "a", "b")
590
591    def test_same_dir(self):
592        self.assertEqual(utils.get_relative_path("/a/b/c", "/a/b"), "c")
593
594    def test_forward_dir(self):
595        self.assertEqual(utils.get_relative_path("/a/b/c/d", "/a/b"), "c/d")
596
597    def test_previous_dir(self):
598        self.assertEqual(utils.get_relative_path("/a/b", "/a/b/c/d"), "../..")
599
600    def test_parallel_dir(self):
601        self.assertEqual(utils.get_relative_path("/a/c/d", "/a/b/c/d"),
602                         "../../../c/d")
603
604
605class test_sh_escape(unittest.TestCase):
606    def _test_in_shell(self, text):
607        escaped_text = utils.sh_escape(text)
608        proc = subprocess.Popen('echo "%s"' % escaped_text, shell=True,
609                                stdin=open(os.devnull, 'r'),
610                                stdout=subprocess.PIPE,
611                                stderr=open(os.devnull, 'w'))
612        stdout, _ = proc.communicate()
613        self.assertEqual(proc.returncode, 0)
614        self.assertEqual(stdout[:-1].decode(), text)
615
616
617    def test_normal_string(self):
618        self._test_in_shell('abcd')
619
620
621    def test_spaced_string(self):
622        self._test_in_shell('abcd efgh')
623
624
625    def test_dollar(self):
626        self._test_in_shell('$')
627
628
629    def test_single_quote(self):
630        self._test_in_shell('\'')
631
632
633    def test_single_quoted_string(self):
634        self._test_in_shell('\'efgh\'')
635
636
637    def test_string_with_single_quote(self):
638        self._test_in_shell("a'b")
639
640
641    def test_string_with_escaped_single_quote(self):
642        self._test_in_shell(r"a\'b")
643
644
645    def test_double_quote(self):
646        self._test_in_shell('"')
647
648
649    def test_double_quoted_string(self):
650        self._test_in_shell('"abcd"')
651
652
653    def test_backtick(self):
654        self._test_in_shell('`')
655
656
657    def test_backticked_string(self):
658        self._test_in_shell('`jklm`')
659
660
661    def test_backslash(self):
662        self._test_in_shell('\\')
663
664
665    def test_backslashed_special_characters(self):
666        self._test_in_shell('\\$')
667        self._test_in_shell('\\"')
668        self._test_in_shell('\\\'')
669        self._test_in_shell('\\`')
670
671
672    def test_backslash_codes(self):
673        self._test_in_shell('\\n')
674        self._test_in_shell('\\r')
675        self._test_in_shell('\\t')
676        self._test_in_shell('\\v')
677        self._test_in_shell('\\b')
678        self._test_in_shell('\\a')
679        self._test_in_shell('\\000')
680
681    def test_real_newline(self):
682        self._test_in_shell('\n')
683        self._test_in_shell('\\\n')
684
685
686class test_sh_quote_word(test_sh_escape):
687    """Run tests on sh_quote_word.
688
689    Inherit from test_sh_escape to get the same tests to run on both.
690    """
691
692    def _test_in_shell(self, text):
693        quoted_word = utils.sh_quote_word(text)
694        echoed_value = subprocess.check_output('echo %s' % quoted_word,
695                                               shell=True)
696        self.assertEqual(echoed_value.decode(), text + '\n')
697
698
699class test_nested_sh_quote_word(test_sh_quote_word):
700    """Run nested tests on sh_quote_word.
701
702    Inherit from test_sh_quote_word to get the same tests to run on both.
703    """
704
705    def _test_in_shell(self, text):
706        command = 'echo ' + utils.sh_quote_word(text)
707        nested_command = 'echo ' + utils.sh_quote_word(command)
708        produced_command = subprocess.check_output(nested_command, shell=True)
709        echoed_value = subprocess.check_output(produced_command, shell=True)
710        self.assertEqual(echoed_value.decode(), text + '\n')
711
712
713class test_run(unittest.TestCase):
714    """
715    Test the utils.run() function.
716
717    Note: This test runs simple external commands to test the utils.run()
718    API without assuming implementation details.
719    """
720
721    # Log levels in ascending severity.
722    LOG_LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
723                  logging.CRITICAL]
724
725
726    def setUp(self):
727        self.god = mock.mock_god(ut=self)
728        self.god.stub_function(utils.logging, 'warning')
729        self.god.stub_function(utils.logging, 'debug')
730
731        # Log level -> StringIO.StringIO.
732        self.logs = {}
733        for level in self.LOG_LEVELS:
734            self.logs[level] = six.StringIO()
735
736        # Override logging_manager.LoggingFile to return buffers.
737        def logging_file(level=None, prefix=None):
738            return self.logs[level]
739        self.god.stub_with(utils.logging_manager, 'LoggingFile', logging_file)
740
741    def tearDown(self):
742        self.god.unstub_all()
743
744
745    def __check_result(self, result, command, exit_status=0, stdout='',
746                       stderr=''):
747        self.assertEquals(result.command, command)
748        self.assertEquals(result.exit_status, exit_status)
749        self.assertEquals(result.stdout, stdout)
750        self.assertEquals(result.stderr, stderr)
751
752
753    def __get_logs(self):
754        """Returns contents of log buffers at all levels.
755
756            @return: 5-element list of strings corresponding to logged messages
757                at the levels in self.LOG_LEVELS.
758        """
759        return [self.logs[v].getvalue() for v in self.LOG_LEVELS]
760
761
762    def test_default_simple(self):
763        cmd = 'echo "hello world"'
764        # expect some king of logging.debug() call but don't care about args
765        utils.logging.debug.expect_any_call()
766        self.__check_result(utils.run(cmd), cmd, stdout='hello world\n')
767
768
769    def test_default_failure(self):
770        cmd = 'exit 11'
771        try:
772            utils.run(cmd, verbose=False)
773        except utils.error.CmdError as err:
774            self.__check_result(err.result_obj, cmd, exit_status=11)
775
776
777    def test_ignore_status(self):
778        cmd = 'echo error >&2 && exit 11'
779        self.__check_result(utils.run(cmd, ignore_status=True, verbose=False),
780                            cmd, exit_status=11, stderr='error\n')
781
782
783    def test_timeout(self):
784        # we expect a logging.warning() message, don't care about the contents
785        utils.logging.warning.expect_any_call()
786        try:
787            utils.run('echo -n output && sleep 10', timeout=1, verbose=False)
788        except utils.error.CmdError as err:
789            self.assertEquals(err.result_obj.stdout, 'output')
790
791
792    def test_stdout_stderr_tee(self):
793        cmd = 'echo output && echo error >&2'
794        stdout_tee = six.StringIO()
795        stderr_tee = six.StringIO()
796
797        self.__check_result(utils.run(
798                cmd, stdout_tee=stdout_tee, stderr_tee=stderr_tee,
799                verbose=False), cmd, stdout='output\n', stderr='error\n')
800        self.assertEqual(stdout_tee.getvalue(), 'output\n')
801        self.assertEqual(stderr_tee.getvalue(), 'error\n')
802
803
804    def test_stdin_string(self):
805        cmd = 'cat'
806        self.__check_result(utils.run(cmd, verbose=False, stdin='hi!\n'),
807                            cmd, stdout='hi!\n')
808
809
810    def test_stdout_tee_to_logs_info(self):
811        """Test logging stdout at the info level."""
812        utils.run('echo output', stdout_tee=utils.TEE_TO_LOGS,
813                  stdout_level=logging.INFO, verbose=False)
814        self.assertEqual(self.__get_logs(), ['', 'output\n', '', '', ''])
815
816
817    def test_stdout_tee_to_logs_warning(self):
818        """Test logging stdout at the warning level."""
819        utils.run('echo output', stdout_tee=utils.TEE_TO_LOGS,
820                  stdout_level=logging.WARNING, verbose=False)
821        self.assertEqual(self.__get_logs(), ['', '', 'output\n', '', ''])
822
823
824    def test_stdout_and_stderr_tee_to_logs(self):
825        """Test simultaneous stdout and stderr log levels."""
826        utils.run('echo output && echo error >&2', stdout_tee=utils.TEE_TO_LOGS,
827                  stderr_tee=utils.TEE_TO_LOGS, stdout_level=logging.INFO,
828                  stderr_level=logging.ERROR, verbose=False)
829        self.assertEqual(self.__get_logs(), ['', 'output\n', '', 'error\n', ''])
830
831
832    def test_default_expected_stderr_log_level(self):
833        """Test default expected stderr log level.
834
835        stderr should be logged at the same level as stdout when
836        stderr_is_expected is true and stderr_level isn't passed.
837        """
838        utils.run('echo output && echo error >&2', stdout_tee=utils.TEE_TO_LOGS,
839                  stderr_tee=utils.TEE_TO_LOGS, stdout_level=logging.INFO,
840                  stderr_is_expected=True, verbose=False)
841        self.assertEqual(self.__get_logs(), ['', 'output\nerror\n', '', '', ''])
842
843
844    def test_safe_args(self):
845        # NOTE: The string in expected_quoted_cmd depends on the internal
846        # implementation of shell quoting which is used by utils.run(),
847        # in this case, sh_quote_word().
848        expected_quoted_cmd = "echo 'hello \"world' again"
849        self.__check_result(utils.run(
850                'echo', verbose=False, args=('hello "world', 'again')),
851                expected_quoted_cmd, stdout='hello "world again\n')
852
853
854    def test_safe_args_given_string(self):
855        self.assertRaises(TypeError, utils.run, 'echo', args='hello')
856
857
858    def test_wait_interrupt(self):
859        """Test that we actually select twice if the first one returns EINTR."""
860        utils.logging.debug.expect_any_call()
861
862        bg_job = utils.BgJob('echo "hello world"')
863        bg_job.result.exit_status = 0
864        self.god.stub_function(utils.select, 'select')
865
866        utils.select.select.expect_any_call().and_raises(
867                select.error(errno.EINTR, 'Select interrupted'))
868        utils.logging.warning.expect_any_call()
869
870        utils.select.select.expect_any_call().and_return(
871                ([bg_job.sp.stdout, bg_job.sp.stderr], [], None))
872        utils.logging.warning.expect_any_call()
873
874        self.assertFalse(
875                utils._wait_for_commands([bg_job], time.time(), None))
876
877
878class test_compare_versions(unittest.TestCase):
879    def test_zerofill(self):
880        self.assertEqual(utils.compare_versions('1.7', '1.10'), -1)
881        self.assertEqual(utils.compare_versions('1.222', '1.3'), 1)
882        self.assertEqual(utils.compare_versions('1.03', '1.3'), 0)
883
884
885    def test_unequal_len(self):
886        self.assertEqual(utils.compare_versions('1.3', '1.3.4'), -1)
887        self.assertEqual(utils.compare_versions('1.3.1', '1.3'), 1)
888
889
890    def test_dash_delimited(self):
891        self.assertEqual(utils.compare_versions('1-2-3', '1-5-1'), -1)
892        self.assertEqual(utils.compare_versions('1-2-1', '1-1-1'), 1)
893        self.assertEqual(utils.compare_versions('1-2-4', '1-2-4'), 0)
894
895
896    def test_alphabets(self):
897        self.assertEqual(utils.compare_versions('m.l.b', 'n.b.a'), -1)
898        self.assertEqual(utils.compare_versions('n.b.a', 'm.l.b'), 1)
899        self.assertEqual(utils.compare_versions('abc.e', 'abc.e'), 0)
900
901
902    def test_mix_symbols(self):
903        self.assertEqual(utils.compare_versions('k-320.1', 'k-320.3'), -1)
904        self.assertEqual(utils.compare_versions('k-231.5', 'k-231.1'), 1)
905        self.assertEqual(utils.compare_versions('k-231.1', 'k-231.1'), 0)
906
907        self.assertEqual(utils.compare_versions('k.320-1', 'k.320-3'), -1)
908        self.assertEqual(utils.compare_versions('k.231-5', 'k.231-1'), 1)
909        self.assertEqual(utils.compare_versions('k.231-1', 'k.231-1'), 0)
910
911
912class test_args_to_dict(unittest.TestCase):
913    def test_no_args(self):
914        result = utils.args_to_dict([])
915        self.assertEqual({}, result)
916
917
918    def test_matches(self):
919        result = utils.args_to_dict(['aBc:DeF', 'SyS=DEf', 'XY_Z:',
920                                     'F__o0O=', 'B8r:=:=', '_bAZ_=:=:'])
921        self.assertEqual(result, {'abc':'DeF', 'sys':'DEf', 'xy_z':'',
922                                  'f__o0o':'', 'b8r':'=:=', '_baz_':':=:'})
923
924
925    def test_unmatches(self):
926        # Temporarily shut warning messages from args_to_dict() when an argument
927        # doesn't match its pattern.
928        logger = logging.getLogger()
929        saved_level = logger.level
930        logger.setLevel(logging.ERROR)
931
932        try:
933            result = utils.args_to_dict(['ab-c:DeF', '--SyS=DEf', 'a*=b', 'a*b',
934                                         ':VAL', '=VVV', 'WORD'])
935            self.assertEqual({}, result)
936        finally:
937            # Restore level.
938            logger.setLevel(saved_level)
939
940
941class test_get_random_port(unittest.TestCase):
942    def do_bind(self, port, socket_type, socket_proto):
943        s = socket.socket(socket.AF_INET, socket_type, socket_proto)
944        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
945        s.bind(('', port))
946        return s
947
948
949    def test_get_port(self):
950        for _ in range(100):
951            p = utils.get_unused_port()
952            s = self.do_bind(p, socket.SOCK_STREAM, socket.IPPROTO_TCP)
953            self.assert_(s.getsockname())
954            s = self.do_bind(p, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
955            self.assert_(s.getsockname())
956
957
958def test_function(arg1, arg2, arg3, arg4=4, arg5=5, arg6=6):
959    """Test global function.
960    """
961
962
963class TestClass(object):
964    """Test class.
965    """
966
967    def test_instance_function(self, arg1, arg2, arg3, arg4=4, arg5=5, arg6=6):
968        """Test instance function.
969        """
970
971
972    @classmethod
973    def test_class_function(cls, arg1, arg2, arg3, arg4=4, arg5=5, arg6=6):
974        """Test class function.
975        """
976
977
978    @staticmethod
979    def test_static_function(arg1, arg2, arg3, arg4=4, arg5=5, arg6=6):
980        """Test static function.
981        """
982
983
984class GetFunctionArgUnittest(unittest.TestCase):
985    """Tests for method get_function_arg_value."""
986
987    def run_test(self, func, insert_arg):
988        """Run test.
989
990        @param func: Function being called with given arguments.
991        @param insert_arg: Set to True to insert an object in the argument list.
992                           This is to mock instance/class object.
993        """
994        if insert_arg:
995            args = (None, 1, 2, 3)
996        else:
997            args = (1, 2, 3)
998        for i in range(1, 7):
999            self.assertEquals(utils.get_function_arg_value(
1000                    func, 'arg%d'%i, args, {}), i)
1001
1002        self.assertEquals(utils.get_function_arg_value(
1003                func, 'arg7', args, {'arg7': 7}), 7)
1004        self.assertRaises(
1005                KeyError, utils.get_function_arg_value,
1006                func, 'arg3', args[:-1], {})
1007
1008
1009    def test_global_function(self):
1010        """Test global function.
1011        """
1012        self.run_test(test_function, False)
1013
1014
1015    def test_instance_function(self):
1016        """Test instance function.
1017        """
1018        self.run_test(TestClass().test_instance_function, True)
1019
1020
1021    def test_class_function(self):
1022        """Test class function.
1023        """
1024        self.run_test(TestClass.test_class_function, True)
1025
1026
1027    def test_static_function(self):
1028        """Test static function.
1029        """
1030        self.run_test(TestClass.test_static_function, False)
1031
1032
1033class IsInSameSubnetUnittest(unittest.TestCase):
1034    """Test is_in_same_subnet function."""
1035
1036    def test_is_in_same_subnet(self):
1037        """Test is_in_same_subnet function."""
1038        self.assertTrue(utils.is_in_same_subnet('192.168.0.0', '192.168.1.2',
1039                                                23))
1040        self.assertFalse(utils.is_in_same_subnet('192.168.0.0', '192.168.1.2',
1041                                                24))
1042        self.assertTrue(utils.is_in_same_subnet('192.168.0.0', '192.168.0.255',
1043                                                24))
1044        self.assertFalse(utils.is_in_same_subnet('191.168.0.0', '192.168.0.0',
1045                                                24))
1046
1047
1048class GetWirelessSsidUnittest(unittest.TestCase):
1049    """Test get_wireless_ssid function."""
1050
1051    DEFAULT_SSID = 'default'
1052    SSID_1 = 'ssid_1'
1053    SSID_2 = 'ssid_2'
1054    SSID_3 = 'ssid_3'
1055
1056    def test_get_wireless_ssid(self):
1057        """Test is_in_same_subnet function."""
1058        god = mock.mock_god()
1059        god.stub_function_to_return(utils.CONFIG, 'get_config_value',
1060                                    self.DEFAULT_SSID)
1061        god.stub_function_to_return(utils.CONFIG, 'get_config_value_regex',
1062                                    {'wireless_ssid_1.2.3.4/24': self.SSID_1,
1063                                     'wireless_ssid_4.3.2.1/16': self.SSID_2,
1064                                     'wireless_ssid_4.3.2.111/32': self.SSID_3})
1065        self.assertEqual(self.SSID_1, utils.get_wireless_ssid('1.2.3.100'))
1066        self.assertEqual(self.SSID_2, utils.get_wireless_ssid('4.3.2.100'))
1067        self.assertEqual(self.SSID_3, utils.get_wireless_ssid('4.3.2.111'))
1068        self.assertEqual(self.DEFAULT_SSID,
1069                         utils.get_wireless_ssid('100.0.0.100'))
1070
1071
1072class LaunchControlBuildParseUnittest(unittest.TestCase):
1073    """Test various parsing functions related to Launch Control builds and
1074    devices.
1075    """
1076
1077    def test_parse_launch_control_target(self):
1078        """Test parse_launch_control_target function."""
1079        target_tests = {
1080                ('shamu', 'userdebug'): 'shamu-userdebug',
1081                ('shamu', 'eng'): 'shamu-eng',
1082                ('shamu-board', 'eng'): 'shamu-board-eng',
1083                (None, None): 'bad_target',
1084                (None, None): 'target'}
1085        for result, target in target_tests.items():
1086            self.assertEqual(result, utils.parse_launch_control_target(target))
1087
1088
1089class GetOffloaderUriTest(unittest.TestCase):
1090    """Test get_offload_gsuri function."""
1091    _IMAGE_STORAGE_SERVER = 'gs://test_image_bucket'
1092
1093    def setUp(self):
1094        self.god = mock.mock_god()
1095
1096    def tearDown(self):
1097        self.god.unstub_all()
1098
1099    def test_get_default_lab_offload_gsuri(self):
1100        """Test default lab offload gsuri ."""
1101        self.god.mock_up(utils.CONFIG, 'CONFIG')
1102        self.god.stub_function_to_return(utils, 'is_moblab', False)
1103        self.assertEqual(utils.DEFAULT_OFFLOAD_GSURI,
1104                utils.get_offload_gsuri())
1105
1106        self.god.check_playback()
1107
1108    def test_get_default_moblab_offload_gsuri(self):
1109        self.god.mock_up(utils.CONFIG, 'CONFIG')
1110        self.god.stub_function_to_return(utils, 'is_moblab', True)
1111        utils.CONFIG.get_config_value.expect_call(
1112                'CROS', 'image_storage_server').and_return(
1113                        self._IMAGE_STORAGE_SERVER)
1114        self.god.stub_function_to_return(utils,
1115                'get_moblab_serial_number', 'test_serial_number')
1116        self.god.stub_function_to_return(utils, 'get_moblab_id', 'test_id')
1117        expected_gsuri = '%sresults/%s/%s/' % (
1118                self._IMAGE_STORAGE_SERVER, 'test_serial_number', 'test_id')
1119        cached_gsuri = utils.DEFAULT_OFFLOAD_GSURI
1120        utils.DEFAULT_OFFLOAD_GSURI = None
1121        gsuri = utils.get_offload_gsuri()
1122        utils.DEFAULT_OFFLOAD_GSURI = cached_gsuri
1123        self.assertEqual(expected_gsuri, gsuri)
1124
1125        self.god.check_playback()
1126
1127    def test_get_moblab_offload_gsuri(self):
1128        """Test default lab offload gsuri ."""
1129        self.god.mock_up(utils.CONFIG, 'CONFIG')
1130        self.god.stub_function_to_return(utils, 'is_moblab', True)
1131        self.god.stub_function_to_return(utils,
1132                'get_moblab_serial_number', 'test_serial_number')
1133        self.god.stub_function_to_return(utils, 'get_moblab_id', 'test_id')
1134        gsuri = '%s%s/%s/' % (
1135                utils.DEFAULT_OFFLOAD_GSURI, 'test_serial_number', 'test_id')
1136        self.assertEqual(gsuri, utils.get_offload_gsuri())
1137
1138        self.god.check_playback()
1139
1140
1141
1142class  MockMetricsTest(unittest.TestCase):
1143    """Test metrics mock class can handle various metrics calls."""
1144
1145    def test_Counter(self):
1146        """Test the mock class can create an instance and call any method.
1147        """
1148        c = metrics.Counter('counter')
1149        c.increment(fields={'key': 1})
1150
1151
1152    def test_Context(self):
1153        """Test the mock class can handle context class.
1154        """
1155        test_value = None
1156        with metrics.SecondsTimer('context') as t:
1157            test_value = 'called_in_context'
1158            t['random_key'] = 'pass'
1159        self.assertEqual('called_in_context', test_value)
1160
1161
1162    def test_decorator(self):
1163        """Test the mock class can handle decorator.
1164        """
1165        class TestClass(object):
1166
1167            def __init__(self):
1168                self.value = None
1169
1170        test_value = TestClass()
1171        test_value.value = None
1172        @metrics.SecondsTimerDecorator('decorator')
1173        def test(arg):
1174            arg.value = 'called_in_decorator'
1175
1176        test(test_value)
1177        self.assertEqual('called_in_decorator', test_value.value)
1178
1179
1180    def test_setitem(self):
1181        """Test the mock class can handle set item call.
1182        """
1183        timer = metrics.SecondsTimer('name')
1184        timer['random_key'] = 'pass'
1185
1186
1187class test_background_sample(unittest.TestCase):
1188    """Test that the background sample can sample as desired.
1189    """
1190
1191    def test_can_sample(self):
1192        """Test that a simple sample will work with no other complications.
1193        """
1194        should_be_sampled = 'name'
1195
1196        def sample_function():
1197            """Return value of variable stored in method."""
1198            return should_be_sampled
1199        still_sampling = True
1200
1201        t = utils.background_sample_until_condition(
1202                function=sample_function,
1203                condition=lambda: still_sampling,
1204                timeout=5,
1205                sleep_interval=0.1)
1206        result = t.finish()
1207        self.assertIn(should_be_sampled, result)
1208
1209
1210    def test_samples_multiple_values(self):
1211        """Test that a sample will work and actually samples at the necessary
1212        intervals, such that it will pick up changes.
1213        """
1214        should_be_sampled = 'name'
1215
1216        def sample_function():
1217            """Return value of variable stored in method."""
1218            return should_be_sampled
1219        still_sampling = True
1220
1221        t = utils.background_sample_until_condition(
1222                function=sample_function,
1223                condition=lambda: still_sampling,
1224                timeout=5,
1225                sleep_interval=0.1)
1226        # Let it sample values some with the initial value.
1227        time.sleep(2.5)
1228        # It should also sample some with the new value.
1229        should_be_sampled = 'noname'
1230        result = t.finish()
1231        self.assertIn('name', result)
1232        self.assertIn('noname', result)
1233
1234
1235class FakeTime(object):
1236    """Provides time() and sleep() for faking time module.
1237    """
1238
1239    def __init__(self, start_time):
1240        self._time = start_time
1241
1242
1243    def time(self):
1244        return self._time
1245
1246
1247    def sleep(self, interval):
1248        self._time += interval
1249
1250
1251class TimeModuleMockTestCase(unittest.TestCase):
1252    """Mocks up utils.time with a FakeTime.
1253
1254    It substitudes time.time() and time.sleep() with FakeTime.time()
1255    and FakeTime.sleep(), respectively.
1256    """
1257
1258    def setUp(self):
1259        self.fake_time_begin = 10
1260        self.fake_time = FakeTime(self.fake_time_begin)
1261        self.patcher = pymock.patch(
1262            'autotest_lib.client.common_lib.utils.time')
1263        self.time_mock = self.patcher.start()
1264        self.addCleanup(self.patcher.stop)
1265        self.time_mock.time.side_effect = self.fake_time.time
1266        self.time_mock.sleep.side_effect = self.fake_time.sleep
1267
1268
1269def always_raise():
1270    """A function that raises an exception."""
1271    raise Exception('always raise')
1272
1273
1274def fail_n_times(count):
1275    """Creates a function that returns False for the first count-th calls.
1276
1277    @return a function returns False for the first count-th calls and True
1278            afterwards.
1279    """
1280    counter = itertools.count(count, -1)
1281    return lambda: next(counter) <= 0
1282
1283
1284class test_poll_for_condition(TimeModuleMockTestCase):
1285    """Test poll_for_condition.
1286    """
1287
1288    def test_ok(self):
1289        """Test polling condition that returns True.
1290        """
1291        self.assertTrue(utils.poll_for_condition(lambda: True))
1292
1293
1294    def test_ok_evaluated_as_true(self):
1295        """Test polling condition which's return value is evaluated as True.
1296        """
1297        self.assertEqual(1, utils.poll_for_condition(lambda: 1))
1298
1299        self.assertEqual('something',
1300                         utils.poll_for_condition(lambda: 'something'))
1301
1302
1303    def test_fail(self):
1304        """Test polling condition that returns False.
1305
1306        Expect TimeoutError exception as neither customized exception nor
1307        exception raised from condition().
1308        """
1309        with self.assertRaises(utils.TimeoutError):
1310            utils.poll_for_condition(lambda: False, timeout=3, sleep_interval=1)
1311        self.assertEqual(3, self.time_mock.sleep.call_count)
1312
1313
1314    def test_fail_evaluated_as_false(self):
1315        """Test polling condition which's return value is evaluated as False.
1316
1317        Expect TimeoutError exception as neither customized exception nor
1318        exception raised from condition().
1319        """
1320        with self.assertRaises(utils.TimeoutError):
1321            utils.poll_for_condition(lambda: 0, timeout=3, sleep_interval=1)
1322        self.assertEqual(3, self.time_mock.sleep.call_count)
1323
1324        with self.assertRaises(utils.TimeoutError):
1325            utils.poll_for_condition(lambda: None, timeout=3, sleep_interval=1)
1326
1327
1328    def test_exception_arg(self):
1329        """Test polling condition always fails.
1330
1331        Expect exception raised by 'exception' args.
1332        """
1333        with self.assertRaisesRegexp(Exception, 'from args'):
1334            utils.poll_for_condition(lambda: False,
1335                                     exception=Exception('from args'),
1336                                     timeout=3, sleep_interval=1)
1337        self.assertEqual(3, self.time_mock.sleep.call_count)
1338
1339
1340    def test_exception_from_condition(self):
1341        """Test polling condition always fails.
1342
1343        Expect exception raised by condition().
1344        """
1345        with self.assertRaisesRegexp(Exception, 'always raise'):
1346            utils.poll_for_condition(always_raise,
1347                                     exception=Exception('from args'),
1348                                     timeout=3, sleep_interval=1)
1349        # For poll_for_condition, if condition() raises exception, it raises
1350        # immidiately without retry. So sleep() should not be called.
1351        self.time_mock.sleep.assert_not_called()
1352
1353
1354    def test_ok_after_retry(self):
1355        """Test polling a condition which is success after retry twice.
1356        """
1357        self.assertTrue(utils.poll_for_condition(fail_n_times(2), timeout=3,
1358                                                 sleep_interval=1))
1359
1360
1361    def test_cannot_wait(self):
1362        """Test polling a condition which fails till timeout.
1363        """
1364        with self.assertRaisesRegexp(
1365                utils.TimeoutError,
1366                'Timed out waiting for unnamed condition'):
1367            utils.poll_for_condition(fail_n_times(4), timeout=3,
1368                                     sleep_interval=1)
1369        self.assertEqual(3, self.time_mock.sleep.call_count)
1370
1371
1372class test_poll_for_condition_ex(TimeModuleMockTestCase):
1373    """Test poll_for_condition_ex.
1374    """
1375
1376    def test_ok(self):
1377        """Test polling condition that returns True.
1378        """
1379        self.assertTrue(utils.poll_for_condition_ex(lambda: True))
1380
1381
1382    def test_ok_evaluated_as_true(self):
1383        """Test polling condition which's return value is evaluated as True.
1384        """
1385        self.assertEqual(1, utils.poll_for_condition_ex(lambda: 1))
1386
1387        self.assertEqual('something',
1388                         utils.poll_for_condition_ex(lambda: 'something'))
1389
1390
1391    def test_fail(self):
1392        """Test polling condition that returns False.
1393
1394        Expect TimeoutError raised.
1395        """
1396        with self.assertRaisesRegexp(
1397                utils.TimeoutError,
1398                'Timed out waiting for unamed condition'):
1399            utils.poll_for_condition_ex(lambda: False, timeout=3,
1400                                        sleep_interval=1)
1401        self.assertEqual(2, self.time_mock.sleep.call_count)
1402
1403
1404    def test_fail_evaluated_as_false(self):
1405        """Test polling condition which's return value is evaluated as False.
1406
1407        Expect TimeoutError raised.
1408        """
1409        with self.assertRaisesRegexp(
1410                utils.TimeoutError,
1411                'Timed out waiting for unamed condition'):
1412            utils.poll_for_condition_ex(lambda: 0, timeout=3,
1413                                        sleep_interval=1)
1414        self.assertEqual(2, self.time_mock.sleep.call_count)
1415
1416        with self.assertRaisesRegexp(
1417                utils.TimeoutError,
1418                'Timed out waiting for unamed condition'):
1419            utils.poll_for_condition_ex(lambda: None, timeout=3,
1420                                        sleep_interval=1)
1421
1422
1423    def test_desc_arg(self):
1424        """Test polling condition always fails with desc.
1425
1426        Expect TimeoutError with condition description embedded.
1427        """
1428        with self.assertRaisesRegexp(
1429                utils.TimeoutError,
1430                'Timed out waiting for always false condition'):
1431            utils.poll_for_condition_ex(lambda: False,
1432                                        desc='always false condition',
1433                                        timeout=3, sleep_interval=1)
1434        self.assertEqual(2, self.time_mock.sleep.call_count)
1435
1436
1437    def test_exception(self):
1438        """Test polling condition that raises.
1439
1440        Expect TimeoutError with condition raised exception embedded.
1441        """
1442        with self.assertRaisesRegexp(
1443                utils.TimeoutError,
1444                "Reason: Exception\('always raise',\)"):
1445            utils.poll_for_condition_ex(always_raise, timeout=3,
1446                                        sleep_interval=1)
1447        self.assertEqual(2, self.time_mock.sleep.call_count)
1448
1449
1450    def test_ok_after_retry(self):
1451        """Test polling a condition which is success after retry twice.
1452        """
1453        self.assertTrue(utils.poll_for_condition_ex(fail_n_times(2), timeout=3,
1454                                                    sleep_interval=1))
1455
1456
1457    def test_cannot_wait(self):
1458        """Test polling a condition which fails till timeout.
1459        """
1460        with self.assertRaisesRegexp(
1461                utils.TimeoutError,
1462                'condition evaluted as false'):
1463            utils.poll_for_condition_ex(fail_n_times(3), timeout=3,
1464                                        sleep_interval=1)
1465        self.assertEqual(2, self.time_mock.sleep.call_count)
1466
1467
1468class test_timer(TimeModuleMockTestCase):
1469    """Test Timer.
1470    """
1471
1472    def test_zero_timeout(self):
1473        """Test Timer with zero timeout.
1474
1475        Only the first timer.sleep(0) is True.
1476        """
1477        timer = utils.Timer(0)
1478        self.assertTrue(timer.sleep(0))
1479        self.assertFalse(timer.sleep(0))
1480        self.time_mock.sleep.assert_not_called()
1481
1482
1483    def test_sleep(self):
1484        """Test Timer.sleep()
1485        """
1486        timeout = 3
1487        sleep_interval = 2
1488        timer = utils.Timer(timeout)
1489
1490        # Kicks off timer.
1491        self.assertTrue(timer.sleep(sleep_interval))
1492        self.assertEqual(self.fake_time_begin + timeout, timer.deadline)
1493        self.assertTrue(timer.sleep(sleep_interval))
1494        # now: 12. 12 + 2 > 13, unable to sleep
1495        self.assertFalse(timer.sleep(sleep_interval))
1496
1497        self.time_mock.sleep.assert_has_calls([pymock.call(sleep_interval)])
1498
1499
1500class test_timeout_error(unittest.TestCase):
1501    """Test TimeoutError.
1502
1503    Test TimeoutError with three invocations format.
1504    """
1505
1506    def test_no_args(self):
1507        """Create TimeoutError without arguments.
1508        """
1509        e = utils.TimeoutError()
1510        self.assertEqual('', str(e))
1511        self.assertEqual('TimeoutError()', repr(e))
1512
1513
1514    def test_with_message(self):
1515        """Create TimeoutError with text message.
1516        """
1517        e = utils.TimeoutError(message='Waiting for condition')
1518        self.assertEqual('Waiting for condition', str(e))
1519        self.assertEqual("TimeoutError('Waiting for condition',)", repr(e))
1520
1521        # Positional message argument for backward compatibility.
1522        e = utils.TimeoutError('Waiting for condition')
1523        self.assertEqual('Waiting for condition', str(e))
1524        self.assertEqual("TimeoutError('Waiting for condition',)", repr(e))
1525
1526
1527
1528    def test_with_reason(self):
1529        """Create TimeoutError with reason only.
1530        """
1531        e = utils.TimeoutError(reason='illegal input')
1532        self.assertEqual("Reason: 'illegal input'", str(e))
1533        self.assertEqual("TimeoutError(\"Reason: 'illegal input'\",)", repr(e))
1534        self.assertEqual('illegal input', e.reason)
1535
1536
1537    def test_with_message_reason(self):
1538        """Create TimeoutError with text message and reason.
1539        """
1540        e = utils.TimeoutError(message='Waiting for condition',
1541                               reason='illegal input')
1542        self.assertEqual("Waiting for condition. Reason: 'illegal input'",
1543                         str(e))
1544        self.assertEqual('illegal input', e.reason)
1545
1546        # Positional message argument for backward compatibility.
1547        e = utils.TimeoutError('Waiting for condition', reason='illegal input')
1548        self.assertEqual("Waiting for condition. Reason: 'illegal input'",
1549                         str(e))
1550        self.assertEqual('illegal input', e.reason)
1551
1552
1553    def test_with_message_reason_object(self):
1554        """Create TimeoutError with text message and reason as exception object.
1555        """
1556        e = utils.TimeoutError(message='Waiting for condition',
1557                               reason=Exception('illegal input'))
1558        self.assertEqual(
1559            "Waiting for condition. Reason: Exception('illegal input',)",
1560            str(e))
1561        self.assertIsInstance(e.reason, Exception)
1562        self.assertEqual('illegal input', str(e.reason))
1563
1564        # Positional message argument for backward compatibility.
1565        e = utils.TimeoutError('Waiting for condition',
1566                               reason=Exception('illegal input'))
1567        self.assertEqual(
1568            "Waiting for condition. Reason: Exception('illegal input',)",
1569            str(e))
1570        self.assertIsInstance(e.reason, Exception)
1571        self.assertEqual('illegal input', str(e.reason))
1572
1573
1574
1575if __name__ == "__main__":
1576    unittest.main()
1577