1#!/usr/bin/python3 2 3"""Tests for site_sysinfo.""" 4 5from __future__ import absolute_import 6from __future__ import division 7from __future__ import print_function 8 9 10__author__ = '[email protected] (Dan Shi)' 11 12import six.moves.cPickle as pickle 13import filecmp 14import os 15import random 16import shutil 17import tempfile 18import unittest 19 20import common 21from autotest_lib.client.bin import site_sysinfo 22from autotest_lib.client.common_lib import autotemp 23from six.moves import range 24from six.moves import zip 25 26 27class diffable_logdir_test(unittest.TestCase): 28 """Tests for methods in class diffable_logdir.""" 29 30 31 def setUp(self): 32 """Initialize a temp direcotry with test files.""" 33 self.tempdir = autotemp.tempdir(unique_id='diffable_logdir') 34 self.src_dir = os.path.join(self.tempdir.name, 'src') 35 self.dest_dir = os.path.join(self.tempdir.name, 'dest') 36 37 self.existing_files = ['existing_file_'+str(i) for i in range(3)] 38 self.existing_files_folder = ['', 'sub', 'sub/sub2'] 39 self.existing_files_path = [os.path.join(self.src_dir, folder, f) 40 for f,folder in zip(self.existing_files, 41 self.existing_files_folder)] 42 self.new_files = ['new_file_'+str(i) for i in range(2)] 43 self.new_files_folder = ['sub', 'sub/sub3'] 44 self.new_files_path = [os.path.join(self.src_dir, folder, f) 45 for f,folder in zip(self.new_files, 46 self.new_files_folder)] 47 48 # Create some file with random data in source directory. 49 for p in self.existing_files_path: 50 self.append_text_to_file(str(random.random()), p) 51 52 self.existing_fifo_path = os.path.join( 53 self.src_dir,'sub/sub2/existing_fifo') 54 os.mkfifo(self.existing_fifo_path) 55 56 57 def tearDown(self): 58 """Clearn up.""" 59 self.tempdir.clean() 60 61 62 def append_text_to_file(self, text, file_path): 63 """Append text to the end of a file, create the file if not existed. 64 65 @param text: text to be appended to a file. 66 @param file_path: path to the file. 67 68 """ 69 dir_name = os.path.dirname(file_path) 70 if not os.path.exists(dir_name): 71 os.makedirs(dir_name) 72 with open(file_path, 'a') as f: 73 f.write(text) 74 75 76 def get_dest_path(self, src_path): 77 """Get file path in dest dir from the one in src dir. 78 79 @param src_path: File path in src dir. 80 81 """ 82 # Make sure src_path is a subpath of self.src_dir 83 self.assertEqual(os.path.commonprefix((src_path, self.src_dir)), 84 self.src_dir) 85 rel_path = os.path.relpath(src_path, self.src_dir) 86 return os.path.join(self.dest_dir, rel_path) 87 88 def assert_trees_equal(self, dir1, dir2, ignore=None): 89 """Assert two directory trees contain the same files. 90 91 @param dir1: the left comparison directory. 92 @param dir2: the right comparison directory. 93 @param ignore: filenames to ignore (in any directory). 94 95 """ 96 dircmps = [] 97 dircmps.append(filecmp.dircmp(dir1, dir2, ignore)) 98 while dircmps: 99 dcmp = dircmps.pop() 100 self.assertEqual(dcmp.left_list, dcmp.right_list) 101 self.assertEqual([], dcmp.diff_files) 102 dircmps.extend(dcmp.subdirs.values()) 103 104 105 def test_diffable_logdir_success(self): 106 """Test the diff function to save new data from a directory.""" 107 info = site_sysinfo.diffable_logdir(self.src_dir, 108 keep_file_hierarchy=False, 109 append_diff_in_name=False) 110 # Run the first time to collect file status. 111 info.run(log_dir=None, collect_init_status=True) 112 113 # Add new files to the test directory. 114 for file_name, file_path in zip(self.new_files, 115 self.new_files_path): 116 self.append_text_to_file(file_name, file_path) 117 118 # Temp file for existing_file_2, used to hold on the inode. If the 119 # file is deleted and recreated, its inode might not change. 120 existing_file_2 = self.existing_files_path[2] 121 existing_file_2_tmp = existing_file_2 + '_tmp' 122 os.rename(existing_file_2, existing_file_2_tmp) 123 124 # Append data to existing file. 125 for file_name, file_path in zip(self.existing_files, 126 self.existing_files_path): 127 self.append_text_to_file(file_name, file_path) 128 129 # Remove the tmp file. 130 os.remove(existing_file_2_tmp) 131 132 # Add a new FIFO 133 new_fifo_path = os.path.join(self.src_dir, 'sub/sub2/new_fifo') 134 os.mkfifo(new_fifo_path) 135 136 # Run the second time to do diff. 137 info.run(self.dest_dir, collect_init_status=False, collect_all=True) 138 139 # Validate files in dest_dir. 140 for file_name, file_path in zip(self.existing_files+self.new_files, 141 self.existing_files_path+self.new_files_path): 142 file_path = self.get_dest_path(file_path) 143 with open(file_path, 'r') as f: 144 self.assertEqual(file_name, f.read()) 145 146 # Assert that FIFOs are not in the diff. 147 self.assertFalse(os.path.exists( 148 self.get_dest_path(self.existing_fifo_path)), 149 msg='Existing FIFO present in diff sysinfo') 150 self.assertFalse(os.path.exists(self.get_dest_path(new_fifo_path)), 151 msg='New FIFO present in diff sysinfo') 152 153 # With collect_all=True, full sysinfo should also be present. 154 full_sysinfo_path = self.dest_dir + self.src_dir 155 self.assertTrue(os.path.exists(full_sysinfo_path), 156 msg='Full sysinfo not present') 157 158 # Assert that the full sysinfo is present. 159 self.assertNotEqual(self.src_dir, full_sysinfo_path) 160 self.assert_trees_equal(self.src_dir, full_sysinfo_path) 161 162 163class LogdirTestCase(unittest.TestCase): 164 """Tests logdir.run""" 165 166 def setUp(self): 167 self.tempdir = tempfile.mkdtemp() 168 self.addCleanup(shutil.rmtree, self.tempdir) 169 170 self.from_dir = os.path.join(self.tempdir, 'from') 171 self.to_dir = os.path.join(self.tempdir, 'to') 172 os.mkdir(self.from_dir) 173 os.mkdir(self.to_dir) 174 175 def _destination_path(self, relative_path, from_dir=None): 176 """The expected destination path for a subdir of the source directory""" 177 if from_dir is None: 178 from_dir = self.from_dir 179 return '%s%s' % (self.to_dir, os.path.join(from_dir, relative_path)) 180 181 def test_run_recreates_absolute_source_path(self): 182 """When copying files, the absolute path from the source is recreated 183 in the destination folder. 184 """ 185 os.mkdir(os.path.join(self.from_dir, 'fubar')) 186 logdir = site_sysinfo.logdir(self.from_dir) 187 logdir.run(self.to_dir) 188 destination_path= self._destination_path('fubar') 189 self.assertTrue(os.path.exists(destination_path), 190 msg='Failed to copy to %s' % destination_path) 191 192 def test_run_skips_symlinks(self): 193 os.mkdir(os.path.join(self.from_dir, 'real')) 194 os.symlink(os.path.join(self.from_dir, 'real'), 195 os.path.join(self.from_dir, 'symlink')) 196 197 logdir = site_sysinfo.logdir(self.from_dir) 198 logdir.run(self.to_dir) 199 200 destination_path_real = self._destination_path('real') 201 self.assertTrue(os.path.exists(destination_path_real), 202 msg='real directory was not copied to %s' % 203 destination_path_real) 204 destination_path_link = self._destination_path('symlink') 205 self.assertFalse( 206 os.path.exists(destination_path_link), 207 msg='symlink was copied to %s' % destination_path_link) 208 209 def test_run_resolves_symlinks_to_source_root(self): 210 """run tries hard to get to the source directory before copying. 211 212 Within the source folder, we skip any symlinks, but we first try to 213 resolve symlinks to the source root itself. 214 """ 215 os.mkdir(os.path.join(self.from_dir, 'fubar')) 216 from_symlink = os.path.join(self.tempdir, 'from_symlink') 217 os.symlink(self.from_dir, from_symlink) 218 219 logdir = site_sysinfo.logdir(from_symlink) 220 logdir.run(self.to_dir) 221 222 destination_path = self._destination_path('fubar') 223 self.assertTrue(os.path.exists(destination_path), 224 msg='Failed to copy to %s' % destination_path) 225 226 def test_run_excludes_common_patterns(self): 227 os.mkdir(os.path.join(self.from_dir, 'autoserv2344')) 228 # Create empty file. 229 open(os.path.join(self.from_dir, 'system.journal'), 'w').close() 230 deeper_subdir = os.path.join('prefix', 'autoserv', 'suffix') 231 os.makedirs(os.path.join(self.from_dir, deeper_subdir)) 232 233 logdir = site_sysinfo.logdir(self.from_dir) 234 logdir.run(self.to_dir) 235 236 destination_path = self._destination_path('autoserv2344') 237 self.assertFalse(os.path.exists(destination_path), 238 msg='Copied banned file to %s' % destination_path) 239 destination_path = self._destination_path(deeper_subdir) 240 self.assertFalse(os.path.exists(destination_path), 241 msg='Copied banned file to %s' % destination_path) 242 destination_path = self._destination_path('system.journal') 243 self.assertFalse(os.path.exists(destination_path), 244 msg='Copied banned file to %s' % destination_path) 245 246 def test_run_ignores_exclude_patterns_in_leading_dirs(self): 247 """Exclude patterns should only be applied to path suffixes within 248 from_dir, not to the root directory itself. 249 """ 250 exclude_pattern_dir = os.path.join(self.from_dir, 'autoserv2344') 251 os.makedirs(os.path.join(exclude_pattern_dir, 'fubar')) 252 logdir = site_sysinfo.logdir(exclude_pattern_dir) 253 logdir.run(self.to_dir) 254 destination_path = self._destination_path('fubar', 255 from_dir=exclude_pattern_dir) 256 self.assertTrue(os.path.exists(destination_path), 257 msg='Failed to copy to %s' % destination_path) 258 259 def test_pickle_unpickle_equal(self): 260 """Check pickle-unpickle round-trip.""" 261 logdir = site_sysinfo.logdir( 262 self.from_dir, 263 excludes=(site_sysinfo.logdir.DEFAULT_EXCLUDES + ('a',))) 264 # base_job uses protocol 2 to pickle. We follow suit. 265 logdir_pickle = pickle.dumps(logdir, protocol=2) 266 unpickled_logdir = pickle.loads(logdir_pickle) 267 268 self.assertEqual(unpickled_logdir, logdir) 269 270 def test_pickle_enforce_required_attributes(self): 271 """Some attributes from this object should never be dropped. 272 273 When running a client test against an older build, we pickle the objects 274 of this class from newer version of the class and unpicle to older 275 versions of the class. The older versions require some attributes to be 276 present. 277 """ 278 logdir = site_sysinfo.logdir( 279 self.from_dir, 280 excludes=(site_sysinfo.logdir.DEFAULT_EXCLUDES + ('a',))) 281 # base_job uses protocol 2 to pickle. We follow suit. 282 logdir_pickle = pickle.dumps(logdir, protocol=2) 283 logdir = pickle.loads(logdir_pickle) 284 285 self.assertEqual(logdir.additional_exclude, 'a') 286 287 def test_pickle_enforce_required_attributes_default(self): 288 """Some attributes from this object should never be dropped. 289 290 When running a client test against an older build, we pickle the objects 291 of this class from newer version of the class and unpicle to older 292 versions of the class. The older versions require some attributes to be 293 present. 294 """ 295 logdir = site_sysinfo.logdir(self.from_dir) 296 # base_job uses protocol 2 to pickle. We follow suit. 297 logdir_pickle = pickle.dumps(logdir, protocol=2) 298 logdir = pickle.loads(logdir_pickle) 299 300 self.assertEqual(logdir.additional_exclude, None) 301 302 def test_unpickle_handle_missing__excludes(self): 303 """Accurately handle missing _excludes attribute from pickles 304 305 This can happen when running brand new version of this class that 306 introduced this attribute from older server side code in prod. 307 """ 308 logdir = site_sysinfo.logdir(self.from_dir) 309 delattr(logdir, '_excludes') 310 # base_job uses protocol 2 to pickle. We follow suit. 311 logdir_pickle = pickle.dumps(logdir, protocol=2) 312 logdir = pickle.loads(logdir_pickle) 313 314 self.assertEqual(logdir._excludes, 315 site_sysinfo.logdir.DEFAULT_EXCLUDES) 316 317 def test_unpickle_handle_missing__excludes_default(self): 318 """Accurately handle missing _excludes attribute from pickles 319 320 This can happen when running brand new version of this class that 321 introduced this attribute from older server side code in prod. 322 """ 323 logdir = site_sysinfo.logdir( 324 self.from_dir, 325 excludes=(site_sysinfo.logdir.DEFAULT_EXCLUDES + ('a',))) 326 delattr(logdir, '_excludes') 327 # base_job uses protocol 2 to pickle. We follow suit. 328 logdir_pickle = pickle.dumps(logdir, protocol=2) 329 logdir = pickle.loads(logdir_pickle) 330 331 self.assertEqual( 332 logdir._excludes, 333 (site_sysinfo.logdir.DEFAULT_EXCLUDES + ('a',))) 334 335 336if __name__ == '__main__': 337 unittest.main() 338