xref: /aosp_15_r20/external/autotest/client/bin/site_sysinfo_unittest.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1#!/usr/bin/python3
2
3"""Tests for site_sysinfo."""
4
5from __future__ import absolute_import
6from __future__ import division
7from __future__ import print_function
8
9
10__author__ = '[email protected] (Dan Shi)'
11
12import six.moves.cPickle as pickle
13import filecmp
14import os
15import random
16import shutil
17import tempfile
18import unittest
19
20import common
21from autotest_lib.client.bin import site_sysinfo
22from autotest_lib.client.common_lib import autotemp
23from six.moves import range
24from six.moves import zip
25
26
27class diffable_logdir_test(unittest.TestCase):
28    """Tests for methods in class diffable_logdir."""
29
30
31    def setUp(self):
32        """Initialize a temp direcotry with test files."""
33        self.tempdir = autotemp.tempdir(unique_id='diffable_logdir')
34        self.src_dir = os.path.join(self.tempdir.name, 'src')
35        self.dest_dir = os.path.join(self.tempdir.name, 'dest')
36
37        self.existing_files = ['existing_file_'+str(i) for i in range(3)]
38        self.existing_files_folder = ['', 'sub', 'sub/sub2']
39        self.existing_files_path = [os.path.join(self.src_dir, folder, f)
40                                    for f,folder in zip(self.existing_files,
41                                                self.existing_files_folder)]
42        self.new_files = ['new_file_'+str(i) for i in range(2)]
43        self.new_files_folder = ['sub', 'sub/sub3']
44        self.new_files_path = [os.path.join(self.src_dir, folder, f)
45                                    for f,folder in zip(self.new_files,
46                                                self.new_files_folder)]
47
48        # Create some file with random data in source directory.
49        for p in self.existing_files_path:
50            self.append_text_to_file(str(random.random()), p)
51
52        self.existing_fifo_path = os.path.join(
53            self.src_dir,'sub/sub2/existing_fifo')
54        os.mkfifo(self.existing_fifo_path)
55
56
57    def tearDown(self):
58        """Clearn up."""
59        self.tempdir.clean()
60
61
62    def append_text_to_file(self, text, file_path):
63        """Append text to the end of a file, create the file if not existed.
64
65        @param text: text to be appended to a file.
66        @param file_path: path to the file.
67
68        """
69        dir_name = os.path.dirname(file_path)
70        if not os.path.exists(dir_name):
71            os.makedirs(dir_name)
72        with open(file_path, 'a') as f:
73            f.write(text)
74
75
76    def get_dest_path(self, src_path):
77        """Get file path in dest dir from the one in src dir.
78
79        @param src_path: File path in src dir.
80
81        """
82        # Make sure src_path is a subpath of self.src_dir
83        self.assertEqual(os.path.commonprefix((src_path, self.src_dir)),
84                         self.src_dir)
85        rel_path = os.path.relpath(src_path, self.src_dir)
86        return os.path.join(self.dest_dir, rel_path)
87
88    def assert_trees_equal(self, dir1, dir2, ignore=None):
89        """Assert two directory trees contain the same files.
90
91        @param dir1: the left comparison directory.
92        @param dir2: the right comparison directory.
93        @param ignore: filenames to ignore (in any directory).
94
95        """
96        dircmps = []
97        dircmps.append(filecmp.dircmp(dir1, dir2, ignore))
98        while dircmps:
99            dcmp = dircmps.pop()
100            self.assertEqual(dcmp.left_list, dcmp.right_list)
101            self.assertEqual([], dcmp.diff_files)
102            dircmps.extend(dcmp.subdirs.values())
103
104
105    def test_diffable_logdir_success(self):
106        """Test the diff function to save new data from a directory."""
107        info = site_sysinfo.diffable_logdir(self.src_dir,
108                                            keep_file_hierarchy=False,
109                                            append_diff_in_name=False)
110        # Run the first time to collect file status.
111        info.run(log_dir=None, collect_init_status=True)
112
113        # Add new files to the test directory.
114        for file_name, file_path in zip(self.new_files,
115                                         self.new_files_path):
116            self.append_text_to_file(file_name, file_path)
117
118        # Temp file for existing_file_2, used to hold on the inode. If the
119        # file is deleted and recreated, its inode might not change.
120        existing_file_2 = self.existing_files_path[2]
121        existing_file_2_tmp =  existing_file_2 + '_tmp'
122        os.rename(existing_file_2, existing_file_2_tmp)
123
124        # Append data to existing file.
125        for file_name, file_path in zip(self.existing_files,
126                                         self.existing_files_path):
127            self.append_text_to_file(file_name, file_path)
128
129        # Remove the tmp file.
130        os.remove(existing_file_2_tmp)
131
132        # Add a new FIFO
133        new_fifo_path = os.path.join(self.src_dir, 'sub/sub2/new_fifo')
134        os.mkfifo(new_fifo_path)
135
136        # Run the second time to do diff.
137        info.run(self.dest_dir, collect_init_status=False, collect_all=True)
138
139        # Validate files in dest_dir.
140        for file_name, file_path in zip(self.existing_files+self.new_files,
141                                self.existing_files_path+self.new_files_path):
142            file_path = self.get_dest_path(file_path)
143            with open(file_path, 'r') as f:
144                self.assertEqual(file_name, f.read())
145
146        # Assert that FIFOs are not in the diff.
147        self.assertFalse(os.path.exists(
148                self.get_dest_path(self.existing_fifo_path)),
149                         msg='Existing FIFO present in diff sysinfo')
150        self.assertFalse(os.path.exists(self.get_dest_path(new_fifo_path)),
151                         msg='New FIFO present in diff sysinfo')
152
153        # With collect_all=True, full sysinfo should also be present.
154        full_sysinfo_path = self.dest_dir + self.src_dir
155        self.assertTrue(os.path.exists(full_sysinfo_path),
156                        msg='Full sysinfo not present')
157
158        # Assert that the full sysinfo is present.
159        self.assertNotEqual(self.src_dir, full_sysinfo_path)
160        self.assert_trees_equal(self.src_dir, full_sysinfo_path)
161
162
163class LogdirTestCase(unittest.TestCase):
164    """Tests logdir.run"""
165
166    def setUp(self):
167        self.tempdir = tempfile.mkdtemp()
168        self.addCleanup(shutil.rmtree, self.tempdir)
169
170        self.from_dir = os.path.join(self.tempdir, 'from')
171        self.to_dir = os.path.join(self.tempdir, 'to')
172        os.mkdir(self.from_dir)
173        os.mkdir(self.to_dir)
174
175    def _destination_path(self, relative_path, from_dir=None):
176        """The expected destination path for a subdir of the source directory"""
177        if from_dir is None:
178            from_dir = self.from_dir
179        return '%s%s' % (self.to_dir, os.path.join(from_dir, relative_path))
180
181    def test_run_recreates_absolute_source_path(self):
182        """When copying files, the absolute path from the source is recreated
183        in the destination folder.
184        """
185        os.mkdir(os.path.join(self.from_dir, 'fubar'))
186        logdir = site_sysinfo.logdir(self.from_dir)
187        logdir.run(self.to_dir)
188        destination_path= self._destination_path('fubar')
189        self.assertTrue(os.path.exists(destination_path),
190                        msg='Failed to copy to %s' % destination_path)
191
192    def test_run_skips_symlinks(self):
193        os.mkdir(os.path.join(self.from_dir, 'real'))
194        os.symlink(os.path.join(self.from_dir, 'real'),
195                   os.path.join(self.from_dir, 'symlink'))
196
197        logdir = site_sysinfo.logdir(self.from_dir)
198        logdir.run(self.to_dir)
199
200        destination_path_real = self._destination_path('real')
201        self.assertTrue(os.path.exists(destination_path_real),
202                        msg='real directory was not copied to %s' %
203                        destination_path_real)
204        destination_path_link = self._destination_path('symlink')
205        self.assertFalse(
206                os.path.exists(destination_path_link),
207                msg='symlink was copied to %s' % destination_path_link)
208
209    def test_run_resolves_symlinks_to_source_root(self):
210        """run tries hard to get to the source directory before copying.
211
212        Within the source folder, we skip any symlinks, but we first try to
213        resolve symlinks to the source root itself.
214        """
215        os.mkdir(os.path.join(self.from_dir, 'fubar'))
216        from_symlink = os.path.join(self.tempdir, 'from_symlink')
217        os.symlink(self.from_dir, from_symlink)
218
219        logdir = site_sysinfo.logdir(from_symlink)
220        logdir.run(self.to_dir)
221
222        destination_path = self._destination_path('fubar')
223        self.assertTrue(os.path.exists(destination_path),
224                        msg='Failed to copy to %s' % destination_path)
225
226    def test_run_excludes_common_patterns(self):
227        os.mkdir(os.path.join(self.from_dir, 'autoserv2344'))
228        # Create empty file.
229        open(os.path.join(self.from_dir, 'system.journal'), 'w').close()
230        deeper_subdir = os.path.join('prefix', 'autoserv', 'suffix')
231        os.makedirs(os.path.join(self.from_dir, deeper_subdir))
232
233        logdir = site_sysinfo.logdir(self.from_dir)
234        logdir.run(self.to_dir)
235
236        destination_path = self._destination_path('autoserv2344')
237        self.assertFalse(os.path.exists(destination_path),
238                         msg='Copied banned file to %s' % destination_path)
239        destination_path = self._destination_path(deeper_subdir)
240        self.assertFalse(os.path.exists(destination_path),
241                         msg='Copied banned file to %s' % destination_path)
242        destination_path = self._destination_path('system.journal')
243        self.assertFalse(os.path.exists(destination_path),
244                         msg='Copied banned file to %s' % destination_path)
245
246    def test_run_ignores_exclude_patterns_in_leading_dirs(self):
247        """Exclude patterns should only be applied to path suffixes within
248        from_dir, not to the root directory itself.
249        """
250        exclude_pattern_dir = os.path.join(self.from_dir, 'autoserv2344')
251        os.makedirs(os.path.join(exclude_pattern_dir, 'fubar'))
252        logdir = site_sysinfo.logdir(exclude_pattern_dir)
253        logdir.run(self.to_dir)
254        destination_path = self._destination_path('fubar',
255                                                  from_dir=exclude_pattern_dir)
256        self.assertTrue(os.path.exists(destination_path),
257                        msg='Failed to copy to %s' % destination_path)
258
259    def test_pickle_unpickle_equal(self):
260        """Check pickle-unpickle round-trip."""
261        logdir = site_sysinfo.logdir(
262                self.from_dir,
263                excludes=(site_sysinfo.logdir.DEFAULT_EXCLUDES + ('a',)))
264        # base_job uses protocol 2 to pickle. We follow suit.
265        logdir_pickle = pickle.dumps(logdir, protocol=2)
266        unpickled_logdir = pickle.loads(logdir_pickle)
267
268        self.assertEqual(unpickled_logdir, logdir)
269
270    def test_pickle_enforce_required_attributes(self):
271        """Some attributes from this object should never be dropped.
272
273        When running a client test against an older build, we pickle the objects
274        of this class from newer version of the class and unpicle to older
275        versions of the class. The older versions require some attributes to be
276        present.
277        """
278        logdir = site_sysinfo.logdir(
279                self.from_dir,
280                excludes=(site_sysinfo.logdir.DEFAULT_EXCLUDES + ('a',)))
281        # base_job uses protocol 2 to pickle. We follow suit.
282        logdir_pickle = pickle.dumps(logdir, protocol=2)
283        logdir = pickle.loads(logdir_pickle)
284
285        self.assertEqual(logdir.additional_exclude, 'a')
286
287    def test_pickle_enforce_required_attributes_default(self):
288        """Some attributes from this object should never be dropped.
289
290        When running a client test against an older build, we pickle the objects
291        of this class from newer version of the class and unpicle to older
292        versions of the class. The older versions require some attributes to be
293        present.
294        """
295        logdir = site_sysinfo.logdir(self.from_dir)
296        # base_job uses protocol 2 to pickle. We follow suit.
297        logdir_pickle = pickle.dumps(logdir, protocol=2)
298        logdir = pickle.loads(logdir_pickle)
299
300        self.assertEqual(logdir.additional_exclude, None)
301
302    def test_unpickle_handle_missing__excludes(self):
303        """Accurately handle missing _excludes attribute from pickles
304
305        This can happen when running brand new version of this class that
306        introduced this attribute from older server side code in prod.
307        """
308        logdir = site_sysinfo.logdir(self.from_dir)
309        delattr(logdir, '_excludes')
310        # base_job uses protocol 2 to pickle. We follow suit.
311        logdir_pickle = pickle.dumps(logdir, protocol=2)
312        logdir = pickle.loads(logdir_pickle)
313
314        self.assertEqual(logdir._excludes,
315                         site_sysinfo.logdir.DEFAULT_EXCLUDES)
316
317    def test_unpickle_handle_missing__excludes_default(self):
318        """Accurately handle missing _excludes attribute from pickles
319
320        This can happen when running brand new version of this class that
321        introduced this attribute from older server side code in prod.
322        """
323        logdir = site_sysinfo.logdir(
324                self.from_dir,
325                excludes=(site_sysinfo.logdir.DEFAULT_EXCLUDES + ('a',)))
326        delattr(logdir, '_excludes')
327        # base_job uses protocol 2 to pickle. We follow suit.
328        logdir_pickle = pickle.dumps(logdir, protocol=2)
329        logdir = pickle.loads(logdir_pickle)
330
331        self.assertEqual(
332                logdir._excludes,
333                (site_sysinfo.logdir.DEFAULT_EXCLUDES + ('a',)))
334
335
336if __name__ == '__main__':
337    unittest.main()
338