1# Lint as: python2, python3 2# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6 7from __future__ import print_function 8from __future__ import division 9from __future__ import absolute_import 10 11import dbus, logging, os, stat 12from dbus.mainloop.glib import DBusGMainLoop 13# AU tests use ToT client code, but ToT -3 client version. 14try: 15 from gi.repository import GObject 16except ImportError: 17 import gobject as GObject 18import six 19from six.moves import zip 20 21import common 22 23from autotest_lib.client.bin import utils 24from autotest_lib.client.common_lib import autotemp, error 25from autotest_lib.client.cros import dbus_util 26from autotest_lib.client.cros.mainloop import ExceptionForward 27from autotest_lib.client.cros.mainloop import GenericTesterMainLoop 28 29 30"""This module contains several helper classes for writing tests to verify the 31CrosDisks DBus interface. In particular, the CrosDisksTester class can be used 32to derive functional tests that interact with the CrosDisks server over DBus. 33""" 34 35 36class ExceptionSuppressor(object): 37 """A context manager class for suppressing certain types of exception. 38 39 An instance of this class is expected to be used with the with statement 40 and takes a set of exception classes at instantiation, which are types of 41 exception to be suppressed (and logged) in the code block under the with 42 statement. 43 44 Example: 45 46 with ExceptionSuppressor(OSError, IOError): 47 # An exception, which is a sub-class of OSError or IOError, is 48 # suppressed in the block code under the with statement. 49 """ 50 def __init__(self, *args): 51 self.__suppressed_exc_types = (args) 52 53 def __enter__(self): 54 return self 55 56 def __exit__(self, exc_type, exc_value, traceback): 57 if exc_type and issubclass(exc_type, self.__suppressed_exc_types): 58 try: 59 logging.exception('Suppressed exception: %s(%s)', 60 exc_type, exc_value) 61 except Exception: 62 pass 63 return True 64 return False 65 66 67class DBusClient(object): 68 """ A base class of a DBus proxy client to test a DBus server. 69 70 This class is expected to be used along with a GObject main loop and provides 71 some convenient functions for testing the DBus API exposed by a DBus server. 72 """ 73 74 def __init__(self, main_loop, bus, bus_name, object_path, timeout=None): 75 """Initializes the instance. 76 77 Args: 78 main_loop: The GObject main loop. 79 bus: The bus where the DBus server is connected to. 80 bus_name: The bus name owned by the DBus server. 81 object_path: The object path of the DBus server. 82 timeout: Maximum time in seconds to wait for the DBus connection. 83 """ 84 self.__signal_content = {} 85 self.main_loop = main_loop 86 self.signal_timeout_in_seconds = 10 87 logging.debug('Getting D-Bus proxy object on bus "%s" and path "%s"', 88 bus_name, object_path) 89 self.proxy_object = dbus_util.get_dbus_object(bus, bus_name, 90 object_path, timeout) 91 92 def clear_signal_content(self, signal_name): 93 """Clears the content of the signal. 94 95 Args: 96 signal_name: The name of the signal. 97 """ 98 if signal_name in self.__signal_content: 99 self.__signal_content[signal_name] = None 100 101 def get_signal_content(self, signal_name): 102 """Gets the content of a signal. 103 104 Args: 105 signal_name: The name of the signal. 106 107 Returns: 108 The content of a signal or None if the signal is not being handled. 109 """ 110 return self.__signal_content.get(signal_name) 111 112 def handle_signal(self, interface, signal_name, argument_names=()): 113 """Registers a signal handler to handle a given signal. 114 115 Args: 116 interface: The DBus interface of the signal. 117 signal_name: The name of the signal. 118 argument_names: A list of argument names that the signal contains. 119 """ 120 if signal_name in self.__signal_content: 121 return 122 123 self.__signal_content[signal_name] = None 124 125 def signal_handler(*args): 126 self.__signal_content[signal_name] = dict(zip(argument_names, args)) 127 128 logging.debug('Handling D-Bus signal "%s(%s)" on interface "%s"', 129 signal_name, ', '.join(argument_names), interface) 130 self.proxy_object.connect_to_signal(signal_name, signal_handler, 131 interface) 132 133 def wait_for_signal(self, signal_name): 134 """Waits for the reception of a signal. 135 136 Args: 137 signal_name: The name of the signal to wait for. 138 139 Returns: 140 The content of the signal. 141 """ 142 if signal_name not in self.__signal_content: 143 return None 144 145 def check_signal_content(): 146 context = self.main_loop.get_context() 147 while context.iteration(False): 148 pass 149 return self.__signal_content[signal_name] is not None 150 151 logging.debug('Waiting for D-Bus signal "%s"', signal_name) 152 utils.poll_for_condition(condition=check_signal_content, 153 desc='%s signal' % signal_name, 154 timeout=self.signal_timeout_in_seconds) 155 content = self.__signal_content[signal_name] 156 logging.debug('Received D-Bus signal "%s(%s)"', signal_name, content) 157 self.__signal_content[signal_name] = None 158 return content 159 160 def expect_signal(self, signal_name, expected_content): 161 """Waits the the reception of a signal and verifies its content. 162 163 Args: 164 signal_name: The name of the signal to wait for. 165 expected_content: The expected content of the signal, which can be 166 partially specified. Only specified fields are 167 compared between the actual and expected content. 168 169 Returns: 170 The actual content of the signal. 171 172 Raises: 173 error.TestFail: A test failure when there is a mismatch between the 174 actual and expected content of the signal. 175 """ 176 actual_content = self.wait_for_signal(signal_name) 177 logging.debug("%s signal: expected=%s actual=%s", 178 signal_name, expected_content, actual_content) 179 for argument, expected_value in six.iteritems(expected_content): 180 if argument not in actual_content: 181 raise error.TestFail( 182 ('%s signal missing "%s": expected=%s, actual=%s') % 183 (signal_name, argument, expected_content, actual_content)) 184 185 if actual_content[argument] != expected_value: 186 raise error.TestFail( 187 ('%s signal not matched on "%s": expected=%s, actual=%s') % 188 (signal_name, argument, expected_content, actual_content)) 189 return actual_content 190 191 192class CrosDisksClient(DBusClient): 193 """A DBus proxy client for testing the CrosDisks DBus server. 194 """ 195 196 CROS_DISKS_BUS_NAME = 'org.chromium.CrosDisks' 197 CROS_DISKS_INTERFACE = 'org.chromium.CrosDisks' 198 CROS_DISKS_OBJECT_PATH = '/org/chromium/CrosDisks' 199 DBUS_PROPERTIES_INTERFACE = 'org.freedesktop.DBus.Properties' 200 FORMAT_COMPLETED_SIGNAL = 'FormatCompleted' 201 FORMAT_COMPLETED_SIGNAL_ARGUMENTS = ( 202 'status', 'path' 203 ) 204 MOUNT_COMPLETED_SIGNAL = 'MountCompleted' 205 MOUNT_COMPLETED_SIGNAL_ARGUMENTS = ( 206 'status', 'source_path', 'source_type', 'mount_path' 207 ) 208 RENAME_COMPLETED_SIGNAL = 'RenameCompleted' 209 RENAME_COMPLETED_SIGNAL_ARGUMENTS = ( 210 'status', 'path' 211 ) 212 213 def __init__(self, main_loop, bus, timeout_seconds=None): 214 """Initializes the instance. 215 216 Args: 217 main_loop: The GObject main loop. 218 bus: The bus where the DBus server is connected to. 219 timeout_seconds: Maximum time in seconds to wait for the DBus 220 connection. 221 """ 222 super(CrosDisksClient, self).__init__(main_loop, bus, 223 self.CROS_DISKS_BUS_NAME, 224 self.CROS_DISKS_OBJECT_PATH, 225 timeout_seconds) 226 self.interface = dbus.Interface(self.proxy_object, 227 self.CROS_DISKS_INTERFACE) 228 self.properties = dbus.Interface(self.proxy_object, 229 self.DBUS_PROPERTIES_INTERFACE) 230 self.handle_signal(self.CROS_DISKS_INTERFACE, 231 self.FORMAT_COMPLETED_SIGNAL, 232 self.FORMAT_COMPLETED_SIGNAL_ARGUMENTS) 233 self.handle_signal(self.CROS_DISKS_INTERFACE, 234 self.MOUNT_COMPLETED_SIGNAL, 235 self.MOUNT_COMPLETED_SIGNAL_ARGUMENTS) 236 self.handle_signal(self.CROS_DISKS_INTERFACE, 237 self.RENAME_COMPLETED_SIGNAL, 238 self.RENAME_COMPLETED_SIGNAL_ARGUMENTS) 239 240 def enumerate_devices(self): 241 """Invokes the CrosDisks EnumerateMountableDevices method. 242 243 Returns: 244 A list of sysfs paths of devices that are recognized by 245 CrosDisks. 246 """ 247 return self.interface.EnumerateDevices() 248 249 def get_device_properties(self, path): 250 """Invokes the CrosDisks GetDeviceProperties method. 251 252 Args: 253 path: The device path. 254 255 Returns: 256 The properties of the device in a dictionary. 257 """ 258 return self.interface.GetDeviceProperties(path) 259 260 def format(self, path, filesystem_type=None, options=None): 261 """Invokes the CrosDisks Format method. 262 263 Args: 264 path: The device path to format. 265 filesystem_type: The filesystem type used for formatting the device. 266 options: A list of options used for formatting the device. 267 """ 268 if filesystem_type is None: 269 filesystem_type = '' 270 if options is None: 271 options = [] 272 self.clear_signal_content(self.FORMAT_COMPLETED_SIGNAL) 273 self.interface.Format(path, filesystem_type, 274 dbus.Array(options, signature='s')) 275 276 def wait_for_format_completion(self): 277 """Waits for the CrosDisks FormatCompleted signal. 278 279 Returns: 280 The content of the FormatCompleted signal. 281 """ 282 return self.wait_for_signal(self.FORMAT_COMPLETED_SIGNAL) 283 284 def expect_format_completion(self, expected_content): 285 """Waits and verifies for the CrosDisks FormatCompleted signal. 286 287 Args: 288 expected_content: The expected content of the FormatCompleted 289 signal, which can be partially specified. 290 Only specified fields are compared between the 291 actual and expected content. 292 293 Returns: 294 The actual content of the FormatCompleted signal. 295 296 Raises: 297 error.TestFail: A test failure when there is a mismatch between the 298 actual and expected content of the FormatCompleted 299 signal. 300 """ 301 return self.expect_signal(self.FORMAT_COMPLETED_SIGNAL, 302 expected_content) 303 304 def rename(self, path, volume_name=None): 305 """Invokes the CrosDisks Rename method. 306 307 Args: 308 path: The device path to rename. 309 volume_name: The new name used for renaming. 310 """ 311 if volume_name is None: 312 volume_name = '' 313 self.clear_signal_content(self.RENAME_COMPLETED_SIGNAL) 314 self.interface.Rename(path, volume_name) 315 316 def wait_for_rename_completion(self): 317 """Waits for the CrosDisks RenameCompleted signal. 318 319 Returns: 320 The content of the RenameCompleted signal. 321 """ 322 return self.wait_for_signal(self.RENAME_COMPLETED_SIGNAL) 323 324 def expect_rename_completion(self, expected_content): 325 """Waits and verifies for the CrosDisks RenameCompleted signal. 326 327 Args: 328 expected_content: The expected content of the RenameCompleted 329 signal, which can be partially specified. 330 Only specified fields are compared between the 331 actual and expected content. 332 333 Returns: 334 The actual content of the RenameCompleted signal. 335 336 Raises: 337 error.TestFail: A test failure when there is a mismatch between the 338 actual and expected content of the RenameCompleted 339 signal. 340 """ 341 return self.expect_signal(self.RENAME_COMPLETED_SIGNAL, 342 expected_content) 343 344 def mount(self, path, filesystem_type=None, options=None): 345 """Invokes the CrosDisks Mount method. 346 347 Args: 348 path: The device path to mount. 349 filesystem_type: The filesystem type used for mounting the device. 350 options: A list of options used for mounting the device. 351 """ 352 if filesystem_type is None: 353 filesystem_type = '' 354 if options is None: 355 options = [] 356 self.clear_signal_content(self.MOUNT_COMPLETED_SIGNAL) 357 self.interface.Mount(path, filesystem_type, 358 dbus.Array(options, signature='s')) 359 360 def unmount(self, path, options=None): 361 """Invokes the CrosDisks Unmount method. 362 363 Args: 364 path: The device or mount path to unmount. 365 options: A list of options used for unmounting the path. 366 367 Returns: 368 The mount error code. 369 """ 370 if options is None: 371 options = [] 372 return self.interface.Unmount(path, dbus.Array(options, signature='s')) 373 374 def wait_for_mount_completion(self): 375 """Waits for the CrosDisks MountCompleted signal. 376 377 Returns: 378 The content of the MountCompleted signal. 379 """ 380 return self.wait_for_signal(self.MOUNT_COMPLETED_SIGNAL) 381 382 def expect_mount_completion(self, expected_content): 383 """Waits and verifies for the CrosDisks MountCompleted signal. 384 385 Args: 386 expected_content: The expected content of the MountCompleted 387 signal, which can be partially specified. 388 Only specified fields are compared between the 389 actual and expected content. 390 391 Returns: 392 The actual content of the MountCompleted signal. 393 394 Raises: 395 error.TestFail: A test failure when there is a mismatch between the 396 actual and expected content of the MountCompleted 397 signal. 398 """ 399 return self.expect_signal(self.MOUNT_COMPLETED_SIGNAL, 400 expected_content) 401 402 def add_loopback_to_allowlist(self, path): 403 """Adds a device by its path to the allowlist for testing. 404 405 Args: 406 path: path to the /dev/loopX device. 407 """ 408 sys_path = '/sys/devices/virtual/block/' + os.path.basename(path) 409 self.interface.AddDeviceToAllowlist(sys_path) 410 411 def remove_loopback_from_allowlist(self, path): 412 """Removes a device by its sys path from the allowlist for testing. 413 414 Args: 415 path: path to the /dev/loopX device. 416 """ 417 sys_path = '/sys/devices/virtual/block/' + os.path.basename(path) 418 self.interface.RemoveDeviceFromAllowlist(sys_path) 419 420 421class CrosDisksTester(GenericTesterMainLoop): 422 """A base tester class for testing the CrosDisks server. 423 424 A derived class should override the get_tests method to return a list of 425 test methods. The perform_one_test method invokes each test method in the 426 list to verify some functionalities of CrosDisks server. 427 """ 428 def __init__(self, test): 429 bus_loop = DBusGMainLoop(set_as_default=True) 430 self.bus = dbus.SystemBus(mainloop=bus_loop) 431 self.main_loop = GObject.MainLoop() 432 super(CrosDisksTester, self).__init__(test, self.main_loop) 433 self.cros_disks = CrosDisksClient(self.main_loop, self.bus) 434 435 def get_tests(self): 436 """Returns a list of test methods to be invoked by perform_one_test. 437 438 A derived class should override this method. 439 440 Returns: 441 A list of test methods. 442 """ 443 return [] 444 445 @ExceptionForward 446 def perform_one_test(self): 447 """Exercises each test method in the list returned by get_tests. 448 """ 449 tests = self.get_tests() 450 self.remaining_requirements = set([test.__name__ for test in tests]) 451 for test in tests: 452 test() 453 self.requirement_completed(test.__name__) 454 455 def reconnect_client(self, timeout_seconds=None): 456 """"Reconnect the CrosDisks DBus client. 457 458 Args: 459 timeout_seconds: Maximum time in seconds to wait for the DBus 460 connection. 461 """ 462 self.cros_disks = CrosDisksClient(self.main_loop, self.bus, 463 timeout_seconds) 464 465 466class FilesystemTestObject(object): 467 """A base class to represent a filesystem test object. 468 469 A filesystem test object can be a file, directory or symbolic link. 470 A derived class should override the _create and _verify method to implement 471 how the test object should be created and verified, respectively, on a 472 filesystem. 473 """ 474 def __init__(self, path, content, mode): 475 """Initializes the instance. 476 477 Args: 478 path: The relative path of the test object. 479 content: The content of the test object. 480 mode: The file permissions given to the test object. 481 """ 482 self._path = path 483 self._content = content 484 self._mode = mode 485 486 def create(self, base_dir): 487 """Creates the test object in a base directory. 488 489 Args: 490 base_dir: The base directory where the test object is created. 491 492 Returns: 493 True if the test object is created successfully or False otherwise. 494 """ 495 if not self._create(base_dir): 496 logging.debug('Failed to create filesystem test object at "%s"', 497 os.path.join(base_dir, self._path)) 498 return False 499 return True 500 501 def verify(self, base_dir): 502 """Verifies the test object in a base directory. 503 504 Args: 505 base_dir: The base directory where the test object is expected to be 506 found. 507 508 Returns: 509 True if the test object is found in the base directory and matches 510 the expected content, or False otherwise. 511 """ 512 if not self._verify(base_dir): 513 logging.error('Mismatched filesystem object at "%s"', 514 os.path.join(base_dir, self._path)) 515 return False 516 return True 517 518 def _create(self, base_dir): 519 return False 520 521 def _verify(self, base_dir): 522 return False 523 524 525class FilesystemTestDirectory(FilesystemTestObject): 526 """A filesystem test object that represents a directory.""" 527 528 def __init__(self, path, content, mode=stat.S_IRWXU|stat.S_IRGRP| \ 529 stat.S_IXGRP|stat.S_IROTH|stat.S_IXOTH, strict=False): 530 """Initializes the directory. 531 532 Args: 533 path: The name of this directory. 534 content: The list of items in this directory. 535 mode: The file permissions given to this directory. 536 strict: Whether verify() strictly compares directory contents for 537 equality. This flag only applies to this directory, and not 538 to any child directories. 539 """ 540 super(FilesystemTestDirectory, self).__init__(path, content, mode) 541 self._strict = strict 542 543 def _create(self, base_dir): 544 path = os.path.join(base_dir, self._path) if self._path else base_dir 545 546 if self._path: 547 with ExceptionSuppressor(OSError): 548 os.makedirs(path) 549 os.chmod(path, self._mode) 550 551 if not os.path.isdir(path): 552 return False 553 554 for content in self._content: 555 if not content.create(path): 556 return False 557 558 return True 559 560 def _verify(self, base_dir): 561 path = os.path.join(base_dir, self._path) if self._path else base_dir 562 if not os.path.isdir(path): 563 return False 564 565 result = True 566 seen = set() 567 568 for content in self._content: 569 if not content.verify(path): 570 result = False 571 seen.add(content._path) 572 573 if self._strict: 574 for child in os.listdir(path): 575 if child not in seen: 576 logging.error('Unexpected filesystem entry "%s"', 577 os.path.join(path, child)) 578 result = False 579 580 return result 581 582 583class FilesystemTestFile(FilesystemTestObject): 584 """A filesystem test object that represents a file.""" 585 586 def __init__(self, 587 path, 588 content, 589 mode=stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP \ 590 | stat.S_IROTH, 591 mtime=None): 592 """Initializes the file. 593 594 Args: 595 path: The name of this file. 596 content: A byte string with the expected file contents. 597 mode: The file permissions given to this file. 598 mtime: If set, the expected file modification timestamp. 599 """ 600 super(FilesystemTestFile, self).__init__(path, content, mode) 601 self._mtime = mtime 602 603 def _create(self, base_dir): 604 path = os.path.join(base_dir, self._path) 605 with ExceptionSuppressor(IOError): 606 with open(path, 'wb+') as f: 607 f.write(self._content) 608 with ExceptionSuppressor(OSError): 609 os.chmod(path, self._mode) 610 return True 611 return False 612 613 def _verify(self, base_dir): 614 path = os.path.join(base_dir, self._path) 615 with ExceptionSuppressor(IOError): 616 result = True 617 618 if self._content is not None: 619 with open(path, 'rb') as f: 620 if f.read() != self._content: 621 logging.error('Mismatched file contents for "%s"', 622 path) 623 result = False 624 625 if self._mtime is not None: 626 st = os.stat(path) 627 if st.st_mtime != self._mtime: 628 logging.error( 629 'Mismatched file modification time for "%s": ' + 630 'want %d, got %d', path, self._mtime, st.st_mtime) 631 result = False 632 633 return result 634 635 return False 636 637 638class DefaultFilesystemTestContent(FilesystemTestDirectory): 639 def __init__(self): 640 super(DefaultFilesystemTestContent, self).__init__('', [ 641 FilesystemTestFile('file1', '0123456789'), 642 FilesystemTestDirectory('dir1', [ 643 FilesystemTestFile('file1', ''), 644 FilesystemTestFile('file2', 'abcdefg'), 645 FilesystemTestDirectory('dir2', [ 646 FilesystemTestFile('file3', 'abcdefg'), 647 FilesystemTestFile('file4', 'a' * 65536), 648 ]), 649 ]), 650 ], stat.S_IRWXU|stat.S_IRGRP|stat.S_IXGRP|stat.S_IROTH|stat.S_IXOTH) 651 652 653class VirtualFilesystemImage(object): 654 def __init__(self, block_size, block_count, filesystem_type, 655 *args, **kwargs): 656 """Initializes the instance. 657 658 Args: 659 block_size: The number of bytes of each block in the image. 660 block_count: The number of blocks in the image. 661 filesystem_type: The filesystem type to be given to the mkfs 662 program for formatting the image. 663 664 Keyword Args: 665 mount_filesystem_type: The filesystem type to be given to the 666 mount program for mounting the image. 667 mkfs_options: A list of options to be given to the mkfs program. 668 """ 669 self._block_size = block_size 670 self._block_count = block_count 671 self._filesystem_type = filesystem_type 672 self._mount_filesystem_type = kwargs.get('mount_filesystem_type') 673 if self._mount_filesystem_type is None: 674 self._mount_filesystem_type = filesystem_type 675 self._mkfs_options = kwargs.get('mkfs_options') 676 if self._mkfs_options is None: 677 self._mkfs_options = [] 678 self._image_file = None 679 self._loop_device = None 680 self._loop_device_stat = None 681 self._mount_dir = None 682 683 def __del__(self): 684 with ExceptionSuppressor(Exception): 685 self.clean() 686 687 def __enter__(self): 688 self.create() 689 return self 690 691 def __exit__(self, exc_type, exc_value, traceback): 692 self.clean() 693 return False 694 695 def _remove_temp_path(self, temp_path): 696 """Removes a temporary file or directory created using autotemp.""" 697 if temp_path: 698 with ExceptionSuppressor(Exception): 699 path = temp_path.name 700 temp_path.clean() 701 logging.debug('Removed "%s"', path) 702 703 def _remove_image_file(self): 704 """Removes the image file if one has been created.""" 705 self._remove_temp_path(self._image_file) 706 self._image_file = None 707 708 def _remove_mount_dir(self): 709 """Removes the mount directory if one has been created.""" 710 self._remove_temp_path(self._mount_dir) 711 self._mount_dir = None 712 713 @property 714 def image_file(self): 715 """Gets the path of the image file. 716 717 Returns: 718 The path of the image file or None if no image file has been 719 created. 720 """ 721 return self._image_file.name if self._image_file else None 722 723 @property 724 def loop_device(self): 725 """Gets the loop device where the image file is attached to. 726 727 Returns: 728 The path of the loop device where the image file is attached to or 729 None if no loop device is attaching the image file. 730 """ 731 return self._loop_device 732 733 @property 734 def mount_dir(self): 735 """Gets the directory where the image file is mounted to. 736 737 Returns: 738 The directory where the image file is mounted to or None if no 739 mount directory has been created. 740 """ 741 return self._mount_dir.name if self._mount_dir else None 742 743 def create(self): 744 """Creates a zero-filled image file with the specified size. 745 746 The created image file is temporary and removed when clean() 747 is called. 748 """ 749 self.clean() 750 self._image_file = autotemp.tempfile(unique_id='fsImage') 751 try: 752 logging.debug('Creating zero-filled image file at "%s"', 753 self._image_file.name) 754 utils.run('dd if=/dev/zero of=%s bs=%s count=%s' % 755 (self._image_file.name, self._block_size, 756 self._block_count)) 757 except error.CmdError as exc: 758 self._remove_image_file() 759 message = 'Failed to create filesystem image: %s' % exc 760 raise RuntimeError(message) 761 762 def clean(self): 763 """Removes the image file if one has been created. 764 765 Before removal, the image file is detached from the loop device that 766 it is attached to. 767 """ 768 self.detach_from_loop_device() 769 self._remove_image_file() 770 771 def attach_to_loop_device(self): 772 """Attaches the created image file to a loop device. 773 774 Creates the image file, if one has not been created, by calling 775 create(). 776 777 Returns: 778 The path of the loop device where the image file is attached to. 779 """ 780 if self._loop_device: 781 return self._loop_device 782 783 if not self._image_file: 784 self.create() 785 786 logging.debug('Attaching image file "%s" to loop device', 787 self._image_file.name) 788 utils.run('losetup -f %s' % self._image_file.name) 789 output = utils.system_output('losetup -j %s' % self._image_file.name) 790 # output should look like: "/dev/loop0: [000d]:6329 (/tmp/test.img)" 791 self._loop_device = output.split(':')[0] 792 logging.debug('Attached image file "%s" to loop device "%s"', 793 self._image_file.name, self._loop_device) 794 795 self._loop_device_stat = os.stat(self._loop_device) 796 logging.debug('Loop device "%s" (uid=%d, gid=%d, permissions=%04o)', 797 self._loop_device, 798 self._loop_device_stat.st_uid, 799 self._loop_device_stat.st_gid, 800 stat.S_IMODE(self._loop_device_stat.st_mode)) 801 return self._loop_device 802 803 def detach_from_loop_device(self): 804 """Detaches the image file from the loop device.""" 805 if not self._loop_device: 806 return 807 808 self.unmount() 809 810 logging.debug('Cleaning up remaining mount points of loop device "%s"', 811 self._loop_device) 812 utils.run('umount -f %s' % self._loop_device, ignore_status=True) 813 814 logging.debug('Restore ownership/permissions of loop device "%s"', 815 self._loop_device) 816 os.chmod(self._loop_device, 817 stat.S_IMODE(self._loop_device_stat.st_mode)) 818 os.chown(self._loop_device, 819 self._loop_device_stat.st_uid, self._loop_device_stat.st_gid) 820 821 logging.debug('Detaching image file "%s" from loop device "%s"', 822 self._image_file.name, self._loop_device) 823 utils.run('losetup -d %s' % self._loop_device) 824 self._loop_device = None 825 826 def format(self): 827 """Formats the image file as the specified filesystem.""" 828 self.attach_to_loop_device() 829 try: 830 logging.debug('Formatting image file at "%s" as "%s" filesystem', 831 self._image_file.name, self._filesystem_type) 832 utils.run('yes | mkfs -t %s %s %s' % 833 (self._filesystem_type, ' '.join(self._mkfs_options), 834 self._loop_device)) 835 logging.debug('blkid: %s', utils.system_output( 836 'blkid -c /dev/null %s' % self._loop_device, 837 ignore_status=True)) 838 except error.CmdError as exc: 839 message = 'Failed to format filesystem image: %s' % exc 840 raise RuntimeError(message) 841 842 def mount(self, options=None): 843 """Mounts the image file to a directory. 844 845 Args: 846 options: An optional list of mount options. 847 """ 848 if self._mount_dir: 849 return self._mount_dir.name 850 851 if options is None: 852 options = [] 853 854 options_arg = ','.join(options) 855 if options_arg: 856 options_arg = '-o ' + options_arg 857 858 self.attach_to_loop_device() 859 self._mount_dir = autotemp.tempdir(unique_id='fsImage') 860 try: 861 logging.debug('Mounting image file "%s" (%s) to directory "%s"', 862 self._image_file.name, self._loop_device, 863 self._mount_dir.name) 864 utils.run('mount -t %s %s %s %s' % 865 (self._mount_filesystem_type, options_arg, 866 self._loop_device, self._mount_dir.name)) 867 except error.CmdError as exc: 868 self._remove_mount_dir() 869 message = ('Failed to mount virtual filesystem image "%s": %s' % 870 (self._image_file.name, exc)) 871 raise RuntimeError(message) 872 return self._mount_dir.name 873 874 def unmount(self): 875 """Unmounts the image file from the mounted directory.""" 876 if not self._mount_dir: 877 return 878 879 try: 880 logging.debug('Unmounting image file "%s" (%s) from directory "%s"', 881 self._image_file.name, self._loop_device, 882 self._mount_dir.name) 883 utils.run('umount %s' % self._mount_dir.name) 884 except error.CmdError as exc: 885 message = ('Failed to unmount virtual filesystem image "%s": %s' % 886 (self._image_file.name, exc)) 887 raise RuntimeError(message) 888 finally: 889 self._remove_mount_dir() 890 891 def get_volume_label(self): 892 """Gets volume name information of |self._loop_device| 893 894 @return a string with volume name if it exists. 895 """ 896 # This script is run as root in a normal autotest run, 897 # so this works: It doesn't have access to the necessary info 898 # when run as a non-privileged user 899 cmd = "blkid -c /dev/null -o udev %s" % self._loop_device 900 output = utils.system_output(cmd, ignore_status=True) 901 902 for line in output.splitlines(): 903 udev_key, udev_val = line.split('=') 904 905 if udev_key == 'ID_FS_LABEL': 906 return udev_val 907 908 return None 909